# Table of contents

* [Overview](#Overview)
* [Setup](#Setup)
    * [Package installs](#Package-installs)
    * [Authentication and Google Cloud settings](#Authentication-and-Google-Cloud-settings)
    * [Imports](#Imports)
    * [Global variables](#Global-variables)
    * [Utils](#Utils)
    * [Updating searchable and retrievable fields](#Updating-searchable-and-retrievable-fields)
* [Data transformation steps](#Data-transformation-steps)
    * [Data analysis](#Data-analysis)
    * [Simple elements population](#Simple-elements-population)
    * [Brand extraction](#Brand-extraction)
    * [Description generation](#Description-generation)
    * [Tag generation](#Tag-Generation)
    * [Color attribute extraction](#Color-attribute-extraction)
    * [Create and populate enhanced product Table](#Create-and-Populate-Enhanced-Product-Table)
        * [Table cloning and schema extension](#Table-cloning-and-schema-extension)
        * [Data population with extended attributes](#Data-population-with-extended-attributes)
* [Verification of Data enriched catalog](#Verification-of-Data-enriched-catalog)
    * [Data loading](#Data-loading)
    * [Validation](#Validation)
* [Conclusion](#Conclusion)

# Overview

This lab provides a comprehensive guide to managing and enriching product data using Google Cloud services, primarily focusing on Vertex AI and BigQuery. You learn how to perform essential data operations, from initial setup and authentication to advanced data enrichment using generative AI.

**Scope:**
*   Fetching and analyzing product data from BigQuery.
*   Utilizing generative AI (Gemini models) for:
    *   Extracting brand names from product titles.
    *   Generating compelling product descriptions.
    *   Creating relevant product tags.
    *   Consolidating and normalizing product tags.
    *   Identifying color attributes (color families and specific colors).
*   Creating and populating an enhanced product table in BigQuery with the newly generated attributes.
*   Verifying the data enrichment process.
*   Uploading the enriched product data to product catalog.
*   Validating the enriched data through validation tool of Vertex AI Search for commererce.

**Key Learnings:**
Upon completing this lab, you will be able to:
*   Query and analyze product data stored in BigQuery.
*   Leverage generative AI models for various product data enrichment tasks, including attribute extraction and content generation.
*   Manage and update BigQuery table schemas and data, including creating tables, copying data, and merging updates.
*   Understand the practical application of AI in enhancing ecommerce product catalogs.

**Steps:**
*   **Setup**: You will prepare the Python environment, authenticate with Google Cloud, install required packages, and define global variables and utility functions. This ensures the notebook can interact with Google Cloud services and process data efficiently.
*   **Data analysis**: You will analyze the initial product data from BigQuery to understand its current state, schema, and identify missing attributes (e.g., brand, description, tags, detailed color information) that can be enriched.
*   **Data enrichment using generative AI**:
    *   You will perform preliminary token analysis on product titles to identify potential values for simpler attributes.
    *   You will populate `patterns`, `sizes`, and `audience` fields based on the token analysis.
    *   You will use the Gemini model to extract `brand` names from product titles.
    *   You will generate compelling `description` fields for products using their titles.
    *   You will generate product `tags` based on titles and descriptions, then consolidate these tags to create a more refined and manageable set.
    *   You will extract `colorInfo` (color families and specific colors) from product titles.
*   **Store enriched data in BigQuery**:
    *   You will create a new table (`products_enhanced`) in BigQuery by cloning the original product table.
    *   You will extend the schema of this new table to include the newly generated attributes.
    *   You will populate the new attributes in the `products_enhanced` table using the data generated in the enrichment phase (from the `df_products` DataFrame).
*   **Verification**:
    *   You will load the enriched product data from the `products_enhanced` BigQuery table into a new branch (Branch 2) of the Vertex AI Search for commerce catalog.
    *   You will validate the data enrichment by performing search queries against both the original catalog (Branch 0) and the enriched catalog (Branch 2) using the Vertex AI Search for commerce console, observing differences in search results and facet availability.



# Setup
Get started by preparing your environment. Begin with authentication and configuration, which will be required for all subsequent API calls.

## Package installs
Install all required Python packages. Run this cell only once after starting a new kernel.

In [None]:
%pip install google google-cloud-retail google-cloud-storage google-cloud-bigquery pandas
%pip install google-cloud-aiplatform google-genai
%pip install google-cloud-bigquery-storage pyarrow tqdm bigquery-magics
%pip install google-cloud-bigquery[pandas] jupyterlab
%pip install fsspec gcsfs
%pip install matplotlib seaborn plotly
%pip install --upgrade ipython-sql

## Authentication and Google Cloud settings
Before you can interact with the Retail API, you must authenticate with Google Cloud and set up your project context. This ensures all API calls are authorized and associated with the correct Google Cloud project. If authentication fails, you'll be prompted to log in interactively. The `project_id` variable will be used throughout the notebook.

**About `project_id` and Application Default Credentials (ADC):**

- **`project_id`**: This uniquely identifies your Google Cloud project. All API requests, resource creation, and billing are tied to this project. Setting the correct `project_id` ensures your operations are performed in the intended environment and resources are properly tracked.

- **Application Default Credentials (ADC)**: ADC is a mechanism that allows your code to automatically find and use your Google Cloud credentials. Running the `gcloud auth application-default login` command sets up ADC by generating credentials that client libraries (like the Retail API) can use to authenticate API calls on your behalf.

**Why this matters**  
Proper authentication and project selection are essential for secure, authorized access to Google Cloud resources. Without these, API calls will fail or may affect the wrong project. ADC simplifies credential management, especially in development and notebook environments.

In [None]:
import subprocess

try:
  # Try to get an access token
  subprocess.check_output(
    ['gcloud', 'auth', 'application-default', 'print-access-token'],
    stderr=subprocess.STDOUT
  )
  print("Already authenticated with Application Default Credentials.")
except subprocess.CalledProcessError:
  # If it fails, prompt for login
  print("No valid ADC found. Running interactive login...")
  !gcloud auth application-default login

## Imports
Import all necessary libraries for API access, data analysis, and visualization.

In [None]:
from google.cloud.retail_v2 import SearchServiceClient, ProductServiceClient, PredictionServiceClient
from google.cloud.retail_v2.types import product, search_service, ListProductsRequest, SearchRequest, PredictRequest, UserEvent
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.json_format import MessageToDict
import pandas as pd
import http.client as http_client
import logging
import re
from IPython.display import display_html
from matplotlib import pyplot as plt
import seaborn as sns

# enabling BigQuery magics
%load_ext bigquery_magics

# configuring default optoins for pandas
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
# pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

## Global variables
With authentication complete, you define some key variables that will be used in all your API calls. These include resource names and placements, which specify the context for search and recommendation requests.

**What is a 'placement'?**  
A placement is a configuration resource in the Retail API that determines how and where a model is used for serving search or recommendation results. Placements define the context (such as search, browse, or recommendation) and can be customized for different pages or user experiences.

**Why might you have multiple placements or branches?**  
- You may have different placements for various parts of your site or app, such as a homepage recommendation carousel, a category browse page, or a personalized search bar.
- Multiple branches allow you to manage different versions of your product catalog (e.g., staging vs. production, or A/B testing different product sets).

**Example scenarios**
- Using a "default_search" placement for general product search, and a "recently_viewed_default" placement for showing users their recently viewed items.
- Having separate branches for testing new product data before rolling it out to all users, or for running experiments with different recommendation models.

In [None]:
import google.auth
import google.auth.exceptions

# Authenticate with Google Cloud and get the default project ID
try:
  credentials, project_id = google.auth.default()
  print(f"Using project ID: {project_id}")
  !gcloud auth application-default set-quota-project {project_id}
except google.auth.exceptions.DefaultCredentialsError:
  print("Google Cloud Authentication failed. Please configure your credentials.")
  print("You might need to run 'gcloud auth application-default login'")
  project_id = None # Set to None or a default
  
SCRIPTS_BUCKET = "artilekt-vaisc-csb_scripts"

# Define the default placement for search and recommendations
DEFAULT_SEARCH = (
  f"projects/{project_id}/locations/global/catalogs/default_catalog/"
  "placements/default_search" # Use default_search unless you have a specific browse placement
)
RECENTLY_VIEWED_DEFAULT = (
  f"projects/{project_id}/locations/global/catalogs/default_catalog/"
  "placements/recently_viewed_default"
)
DEFAULT_BRANCH = f"projects/{project_id}/locations/global/catalogs/default_catalog/branches/0"

ORIGINAL_PRODUCTS_TABLE = "retail.products"
ENHANCED_PRODUCTS_TABLE = "retail.products_enhanced"

## Utils
To make your analysis easier, you use some utility functions for data conversion and HTTP logging. These help you convert API responses to Pandas DataFrames for analysis and enable detailed logging for troubleshooting.

In [None]:
import pandas as pd
from google.protobuf.json_format import MessageToDict

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
# pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

def json2df(products_list):
  if products_list:
    products_dicts = [dict(sorted(MessageToDict(p._pb).items())) for p in products_list]
    df = pd.json_normalize(products_dicts)
    return df
  else:
    print("No products returned or an error occurred.")
    return pd.DataFrame()

from contextlib import contextmanager

@contextmanager
def http_logging(log_http: bool):
    """
    Context manager to enable/disable HTTP logging for Google API clients.
    Usage:
        with http_logging(log_http):
            # code that needs HTTP logging
    """
    import http.client as http_client
    import logging
    root_logger = logging.getLogger()
    original_http_debuglevel = http_client.HTTPConnection.debuglevel
    original_log_level = root_logger.level
    try:
        if log_http:
            print("\n--- [INFO] Enabling HTTP Logging (forcing REST transport) ---")
            logging.basicConfig()
            root_logger.setLevel(logging.DEBUG)
            http_client.HTTPConnection.debuglevel = 1
            print("--- [INFO] Using REST transport. ---")
        yield
    finally:
        if log_http:
            http_client.HTTPConnection.debuglevel = original_http_debuglevel
            root_logger.setLevel(original_log_level)
            print("--- [INFO] HTTP Logging & Root Log Level Restored ---")

from google.cloud.bigquery import SchemaField
def flatten_schema_fields(schema, prefix=""):
    """Flattens the BigQuery schema, expanding RECORD types."""
    fields_data = []
    for field in schema:
        field_name_full = f"{prefix}{field.name}"
        if field.field_type == 'RECORD' and field.fields:
            fields_data.append({
                "Field Name": field_name_full,
                "Data Type": field.field_type, # or "STRUCT"
                "Mode": field.mode
            })
            fields_data.extend(flatten_schema_fields(field.fields, prefix=f"{field_name_full}."))
        else:
            fields_data.append({
                "Field Name": field_name_full,
                "Data Type": field.field_type,
                "Mode": field.mode
            })
    return fields_data


# Initialize Vertex AI
from google import genai
import json

def gemini_run_query(contents, model_name="gemini-2.0-flash"):
    """
    Runs a query against a specified Gemini model and expects a JSON response.

    This function initializes a new genai.Client for each call, configured
    with the global `project_id` and "us-central1" location. It sends the
    provided `contents` to the specified `model_name`. The model is expected
    to return a response in JSON format.

    Args:
        contents: The prompt or content to send to the Gemini model.
                  This should be structured according to the model's requirements.
        model_name: The name of the Gemini model to use (e.g., "gemini-2.0-flash").
                    Defaults to "gemini-2.0-flash".
        
    Returns:
        A Python dictionary parsed from the model's JSON response if successful.
        Returns None if an error occurs during the API call or JSON parsing.
    """
    # Default config for JSON response
    effective_config = {"response_mime_type": "application/json"}

    # Initialize the Gemini client. 
    # Note: `project_id` should be globally defined and available in this scope.
    gemini = genai.Client(project=project_id, location="us-central1", vertexai=True)

    try:
        response = gemini.models.generate_content( 
            model=model_name, 
            contents=contents,
            config=effective_config,
        )
        # Parse the JSON response
        respText = response.text.strip()
        # print(f"Response text: {respText}") # Uncomment for debugging model responses
        # Convert response to JSON
        return json.loads(respText)
    except Exception as e:
        print(f"Error in gemini_run_query (model: {model_name}, contents starting with: '{str(contents)[:100]}...'): {e}")
        return None # Return None on error

## Updating searchable and retrievable fields

The Retail API allows you to specify which fields in your product catalog are searchable, indexable, and retrievable. Attribute configuration settings will impact search and recommendations behavior across your site.

Search uses the following attribute settings:

* **Indexable:** Search can filter and facet using this attribute.
* **Dynamic faceting:** Search can automatically use this attribute as a dynamic facet based on past user behavior such as facet clicks and views. To enable dynamic faceting for an attribute, Indexable must be set to true for that attribute.
* **Searchable:** This attribute is searchable by search queries, which increases recall for that attribute. This control is applicable only for text attributes.
* **Retrievable:** If set to true, search returns this attribute in responses to search queries. If all attributes have Retrievable set to false, the search results contain only the product name or (for variants) the product name and color information.

It is recommended that you retrieve only the fields you need for your application to optimize performance and reduce costs. For the purpose of this lab, you make all fields searchable and retrievable. This is not recommended for production use, but it will simplify demonstration of the effects some of the advanced search configurations have on the results.

Note that changes to indexable, searchable, and retrievable take effect immediately upon your next catalog full ingestion or within 12 hours or more. Changes to dynamic faceting and tiling and exact match take effect within 2 days. Changes to filterable only apply to filter tag generation for recommendations and take effect within 12 hours or more.

You explore this configuration and effects of these settings in another lab. For the purpose of this lab, set all attributes to searchable and retrievable. This allows you to see the effects of the generative AI enrichment on the search results.

In [None]:
import copy
from google.cloud.retail_v2.types import CatalogAttribute, AttributesConfig, UpdateAttributesConfigRequest
from google.cloud.retail_v2 import CatalogServiceClient

catalog_service_client = CatalogServiceClient()
catalog_name = f"projects/{project_id}/locations/global/catalogs/default_catalog/attributesConfig"
attributes_config = catalog_service_client.get_attributes_config(name=catalog_name)

# Allowed predefined textual attributes for searchable_option
ALLOWED_SEARCHABLE_PREDEFINED = {
    "ageGroups", "brands", "categories", "colorFamilies", "conditions",
    "description", "genders", "materials", "patterns", "sizes", "title"
}

# Define a list of important attributes to be made retrievable (max 30, must be allowed by Retail API)
IMPORTANT_RETRIEVABLE_ATTRIBUTES = [
  "ageGroups",
  "availability",
  "brands",
  "categories",
  "colorFamilies",
  "colors",
  "conditions",
  "cost",
  "currencyCode",
  "description",
  "discount",
  "genders",
  "gtin",
  "images",
  "materials",
  "name",
  "originalPrice",
  "patterns",
  "price",
  "productId",
  "rating",
  "ratingCount",
  "ratingHistogram",
  "sizes",
  "title",
  "uri",
  # Custom attributes below (keep at the bottom)
  "attributes.collection",
  "attributes.ecofriendly",
  "attributes.material",
  "attributes.style",
]

for attr_name, attr in attributes_config.catalog_attributes.items():
    attr.indexable_option = CatalogAttribute.IndexableOption.INDEXABLE_ENABLED
    
    # Set retrievable_option based on importance and inventory status
    if attr_name.startswith("inventories."):
        attr.retrievable_option = CatalogAttribute.RetrievableOption.RETRIEVABLE_DISABLED
    elif attr_name in IMPORTANT_RETRIEVABLE_ATTRIBUTES:
        attr.retrievable_option = CatalogAttribute.RetrievableOption.RETRIEVABLE_ENABLED
    else:
        attr.retrievable_option = CatalogAttribute.RetrievableOption.RETRIEVABLE_DISABLED
        
    # Only set searchable_option=ENABLED for allowed attributes
    if attr_name in ALLOWED_SEARCHABLE_PREDEFINED:
        attr.searchable_option = CatalogAttribute.SearchableOption.SEARCHABLE_ENABLED
    elif attr_name.startswith("attributes.") and attr.type_ == CatalogAttribute.AttributeType.TEXTUAL:
        attr.searchable_option = CatalogAttribute.SearchableOption.SEARCHABLE_ENABLED
    else:
        attr.searchable_option = CatalogAttribute.SearchableOption.SEARCHABLE_DISABLED

# Prepare update request
update_req = UpdateAttributesConfigRequest(
    attributes_config=attributes_config,
    update_mask=None  # None means update all fields
)

# Update the config in the API
updated_config = catalog_service_client.update_attributes_config(request=update_req)

Now that the attributes have been updated, purge the catalog Branch 0 to ensure that the changes take effect immediately. This removes all existing products from the index and trigger a full reindexing with the new attribute settings.

In [None]:
from google.cloud.retail_v2 import ProductServiceClient
from google.cloud.retail_v2.types import PurgeProductsRequest

# Use the existing product_service_client and project_id variables
branch = DEFAULT_BRANCH

purge_request = PurgeProductsRequest(
  parent=branch,
  filter='*',  # Purge all products
  force=True   # Actually perform the purge (not a dry run)
)

product_service_client = ProductServiceClient()
operation = product_service_client.purge_products(request=purge_request)
print("Purging catalog... This may take a few minutes.")
result = operation.result()
print("Purge operation completed.")
print(result)

> **⚠️ It may take 2-5 minutes for the background process to reload the catalog.** You do not need to wait for this process to complete before proceeding with the rest of the lab.

# Data transformation steps
You now proceed with the data transformation steps. 
- First, you analyze the existing product data loaded into Branch 0 of the product catalog. You identify any gaps or issues in the data. 
- Then you see how to address these gaps using data enrichment techniques powered by generative AI models.
- Finally, you create a new table in BigQuery with the enriched product data and validate it using the Vertex AI Search for commerce validation tool.

## Data analysis
Analyze the existing product data loaded into Branch 0 of the product catalog. The data has been imported from a BigQuery table 'products' under 'retail' dataset. Here is the schema and a sample of the data. You can also view the data in the [BigQuery Console](https://console.cloud.google.com/bigquery)

In [None]:
from google.cloud import bigquery
import pandas as pd # Ensure pandas is imported for DataFrame creation

client = bigquery.Client(project=project_id) 

TABLE_ID = f"{project_id}.retail.products"

try:
    table = client.get_table(TABLE_ID)  # Make an API request.
    print(f"Schema for table {TABLE_ID}:")

    # Prepare data for DataFrame using the new flattening function
    schema_data = flatten_schema_fields(table.schema)
    
    df_schema = pd.DataFrame(schema_data)
    display(df_schema)
    
    print(f"\n\nSample data from {TABLE_ID} (first 50 rows):")



    # Query to fetch product titles and IDs
    query = f"""
    SELECT 
        *
    FROM `{project_id}.retail.products`
    WHERE title IS NOT NULL
    LIMIT 50
    """

    print(f"Fetching product data from {project_id}.retail.products...")
    sample_products = client.query(query).to_dataframe()
    print(f"Fetched {len(sample_products)} products")
    display(sample_products)


    # Populate df_products DataFrame with distinct product titles. 
    # This DataFrame will be used later to add AI-generated information back to the products.
    print(f"Fetching distinct product titles from {project_id}.retail.products...")
     # Query to fetch distinct product titles
    query = f"""
    SELECT 
        DISTINCT title
    FROM `{project_id}.retail.products`
    WHERE title IS NOT NULL
    """
    df_products = client.query(query).to_dataframe()

except Exception as e:
    print(f"Error fetching schema or data for table {TABLE_ID}: {e}")

Some important fields which impact ranking and searchability of the products are not populated (such as description). You might also notice that some fields are missing entirely from this schema, such as brand and tags. Here is the full schema describing all possible fields that can be populated in the product catalog. (You can also view the data in the [BigQuery Console](https://console.cloud.google.com/bigquery), retail.products_tmpl table.)

In [None]:
from google.cloud import bigquery
import pandas as pd # Ensure pandas is imported for DataFrame creation

client = bigquery.Client(project=project_id) 

TABLE_ID_TMPL = f"{project_id}.retail.products_tmpl"

try:
    table_tmpl = client.get_table(TABLE_ID_TMPL)  # Make an API request.
    print(f"Schema for table {TABLE_ID_TMPL}:")

    # Prepare data for DataFrame using the new flattening function
    schema_data_tmpl = flatten_schema_fields(table_tmpl.schema)
    
    df_schema_tmpl = pd.DataFrame(schema_data_tmpl)
    display(df_schema_tmpl)

except Exception as e:
    print(f"Error fetching schema or data for table {TABLE_ID_TMPL}: {e}")

Even though `id,` `title` and `category` are the only mandatory fields needed to load catalog data, it is highly desirable to populate as many additional fields as possible to improve the searchability and ranking of the products. Some are more important than others, such as `description`, `brand`, `tags`, `color families`, `target audience`.

If you look at the fields in the original product data, you will notice that the `title` field is the most informative one, which might contain useful information for use to be able to populate additional fields for at least some of the products. There is also a valid reference to `image` uri. In this lab, you focus on working with text fields due to simplicity and low requirement of additional data sources. Various ways to leverage `image` data will be discussed at the end of the lab.

Analyze tokens in the `title` field and see if you can categorize them against the fields you want to populate. The best approach to analyze unstructured text data is to use generative AI models, which can help you extract structured information from the text. You use the Gemini model for this purpose.

In [None]:
from google.cloud import bigquery
import pandas as pd
from collections import Counter
import json
import time
from tqdm import tqdm
import re

# The 'gemini_run_query' function (defined in an earlier cell, e.g., Cell 13) 
# initializes its own Gemini client. No separate client initialization is needed here for it.

# BigQuery client
client_bq = bigquery.Client(project=project_id) 

# 1. Fetch product titles
print("Fetching product titles...")
query_titles = f"SELECT DISTINCT title FROM `{project_id}.retail.products` WHERE title IS NOT NULL"
df_titles = client_bq.query(query_titles).to_dataframe()
print(f"Fetched {len(df_titles)} titles.")

# 2. Fetch target categories from products_tmpl schema
# Assuming df_schema_tmpl is already defined and populated from a previous cell (cell 17)
print("\nUsing pre-existing categories from df_schema_tmpl (from cell 17)...")
target_categories = df_schema_tmpl['Field Name'].tolist()
# Filter out categories that are too generic or complex for simple token mapping if needed
# This list can be refined based on desired output
target_categories = [
    cat for cat in target_categories 
    if not ('.' in cat and not any(sub in cat for sub in ['attributes', 'audience', 'colorInfo', 'priceInfo', 'fulfillmentInfo', 'promotion'])) and 
       cat not in ['name', 'id', 'uri', 'title', 'categories', 'primaryProductId', 'description', 'languageCode', 'gtin', 'retrievableFields', 'expireTime', 'ttl', 'collectionId', 'image']
]
print(f"Using {len(target_categories)} target categories for token mapping: {target_categories}")

# 3. Tokenize titles
print("\nTokenizing titles...")
all_tokens_list = [] 
for title_text in df_titles['title']:
    # Simple tokenization: lowercase and split by non-alphanumeric characters, keeping words
    tokens = re.findall(r'\b\w+\b', str(title_text).lower())
    all_tokens_list.extend(tokens)

unique_tokens_list = list(set(all_tokens_list))
print(f"Found {len(all_tokens_list)} total tokens, {len(unique_tokens_list)} unique tokens.")

# 4. Categorize tokens with Gemini
def categorize_tokens_with_ai(tokens_batch, categories_list):
    prompt = f"""
    You are an expert in e-commerce product data categorization.
    Given a list of product attribute names (categories) and a list of tokens (words from product titles),
    your task is to map each token to the categories from the provided list.

    Categories to use for mapping:
    {json.dumps(categories_list)}

    Rules:
    1. For each token, identify the best matching categories from the list.
    2. A token can belong to multiple categories.
    3. If a token does not clearly fit into any of the provided categories, categorize it as "Uncategorized".
    4. Return the result as a JSON map where the key is the token and the value is the assigned category string.
    5. Be precise. For example, if categories include "attributes.brand" and "attributes.color", a token like "Nike" should map to "attributes.brand", and "red" to "attributes.color".
    6. If a token seems like a general descriptor not fitting a specific attribute (e.g., "new", "sale", "for"), map it to "Uncategorized".

    Tokens to categorize:
    {json.dumps(tokens_batch)}
    """
    # Call the generic gemini_run_query function
    categorized_map = gemini_run_query(
            contents=prompt,

    )

    if categorized_map is None: # Error occurred in gemini_run_query
            # gemini_run_query already prints a detailed error.
            # This function needs to return the specific error structure it promised.
            print(f"Error categorizing batch (tokens starting with '{tokens_batch[0] if tokens_batch else ''}'). AI query failed.")
            return {token: "Error" for token in tokens_batch}

    return categorized_map

print("\nCategorizing tokens with Gemini...")
batch_size_categorization = 150 # Adjusted batch size to manage prompt length and complexity
master_token_category_map = {}

if unique_tokens_list:
    for i in tqdm(range(0, len(unique_tokens_list), batch_size_categorization)):
        current_tokens_batch = unique_tokens_list[i:i+batch_size_categorization]
        batch_map = categorize_tokens_with_ai(current_tokens_batch, target_categories)
        master_token_category_map.update(batch_map) # batch_map will be {token:"Error",...} or {} on error, or valid map
        time.sleep(1.5) # Increased sleep time slightly for API rate limits with potentially larger prompts
else:
    print("No unique tokens to categorize.")

print(f"Finished categorizing tokens. {len(master_token_category_map)} unique tokens processed.")

# 5. Aggregate results
print("\nAggregating results...")
# Initialize with all target categories plus Uncategorized and Error
category_token_counts = {cat: Counter() for cat in target_categories + ["Uncategorized", "Error"]}

for token_item in all_tokens_list: # Iterate through all tokens (with duplicates) to get correct counts
    # Get the category for the token, defaulting to "Uncategorized" if not in map or if mapping failed
    category_for_token = master_token_category_map.get(token_item, "Uncategorized")
    
    # Ensure the category is one you are tracking, otherwise default to Uncategorized
    if category_for_token not in category_token_counts:
        # This case handles if AI returns a category not in target_categories or if it's "Error" from categorize_tokens_with_ai
        if category_for_token == "Error": # Explicitly count tokens that resulted in an "Error" category
            category_token_counts["Error"][token_item] +=1
            continue # Skip further processing for this error token
        else: # For any other unexpected category, map to "Uncategorized"
            category_for_token = "Uncategorized"
        
    category_token_counts[category_for_token][token_item] += 1


In [None]:

# 6. Format output
print("\nSummary of token categorization:")
summary_data_list = [] # Renamed to avoid conflict
for category_name, token_counter_obj in category_token_counts.items():
    if not token_counter_obj: # Skip categories with no tokens assigned
        continue
    total_tokens_in_category = sum(token_counter_obj.values())
    # Get (token, count) for top 50, then extract just the token string
    tokens_list = [item[0] for item in token_counter_obj.most_common(50)]
    summary_data_list.append({
        "category": category_name,
        "token_count": total_tokens_in_category,
        "tokens_list": tokens_list
    })

# Sort by token count in descending order
df_summary_output = pd.DataFrame(summary_data_list)
if not df_summary_output.empty:
    df_summary_output = df_summary_output.sort_values(by="token_count", ascending=False).reset_index(drop=True)
    print("Category, Token Count, [Top 50 Tokens]")
    display(df_summary_output)
else:
    print("No data to summarize.")

print("\nToken analysis and categorization complete.")


Out of the categorized tokens, some can be ignored (such as 'type, uncategorized), while others can be used to populate the fields you want to enrich. For example, tokens 'Nike', 'Adidas', 'Puma' can be used to populate the `brand` field, while tokens like 'running', 'sports', 'casual' can be used to populate the `tags` field. Field types that seem to contain useful information are:
- `colorInfo.colors`
- `brands`
- `audience`
- `materials`
- `patterns`
- `sizes`

Now that you understand the existing product data and the fields you want to enrich, proceed with the data enrichment steps. You use generative AI models to extract structured information from the `title` field and populate some of the fields in the product catalog:
- `colorInfo`
- `brands`
- `description`
- `tags`

But first, you populate some of the fields where values seem to have been identified correctly by preliminary token based analysis. These fields are:
- `patterns`
- `sizes`
- `audience`

## Simple elements population

By initial analysis of the product titles, you have already extracted some useful information that can be used to populate the `patterns`, `sizes`, and `audience` fields. You now populate them using simple string matching techniques.

In [None]:
# print tokens for patterns, sizes, audiences
display(df_summary_output[df_summary_output['category'].str.contains('pattern|size|audience.', regex=True, case=False)])

# populate patterns, sizes and audiences in df_products if any word in the tokens_list is found in the title

# Helper function to get all tokens for a given category keyword
def get_tokens_for_category(df_summary, category_keyword):
    relevant_rows = df_summary[df_summary['category'].str.contains(category_keyword, regex=True, case=False)]
    all_tokens = set()
    for tokens_list in relevant_rows['tokens_list']:
        for token in tokens_list:
            all_tokens.add(str(token).lower())
    return list(all_tokens)

# Get dynamic lists of tokens
pattern_tokens = get_tokens_for_category(df_summary_output, 'pattern')
size_tokens = get_tokens_for_category(df_summary_output, 'size')
# For audience, you might want to be more specific if 'audience.' matches too broadly
# or combine 'audience.genders' and 'audience.ageGroups'
audience_gender_tokens = get_tokens_for_category(df_summary_output, 'audience.genders')
audience_age_tokens = get_tokens_for_category(df_summary_output, 'audience.ageGroups')
audience_tokens = {
    "genders": audience_gender_tokens,
    "ageGroups": audience_age_tokens
}

print(f"Pattern tokens: {pattern_tokens}")
print(f"Size tokens: {size_tokens}")
print(f"Audience tokens: {audience_tokens}")

def extract_patterns(title):
    patterns = []
    # Ensure pattern_tokens is not empty before proceeding
    if not pattern_tokens:
        return None
    for token in title.split():
        if token.lower() in pattern_tokens:
            patterns.append(token)
    return ', '.join(patterns) if patterns else None

def extract_sizes(title):
    sizes = []
    # Ensure size_tokens is not empty
    if not size_tokens:
        return None
    for token in title.split():
        if token.lower() in size_tokens:
            sizes.append(token)
    return ', '.join(sizes) if sizes else None

def extract_audiences(title):
    """Extract audience information including genders and ageGroups from title"""
    result = {
        "genders": [],
        "ageGroups": []
    }
    
    # Ensure audience_tokens is not empty and has expected structure
    if not audience_tokens or not isinstance(audience_tokens, dict):
        return None
    
    title_tokens = [token.lower() for token in title.split()]
    
    # Check for gender tokens
    if "genders" in audience_tokens and audience_tokens["genders"]:
        for token in title_tokens:
            if token in audience_tokens["genders"]:
                result["genders"].append(token)
    
    # Check for age group tokens
    if "ageGroups" in audience_tokens and audience_tokens["ageGroups"]:
        for token in title_tokens:
            if token in audience_tokens["ageGroups"]:
                result["ageGroups"].append(token)
    
    # Return None if no audience data found, otherwise return the structured result
    if not result["genders"] and not result["ageGroups"]:
        return None
    
    return result


df_products['pattern'] = df_products['title'].apply(lambda x: extract_patterns(x))
df_products['size'] = df_products['title'].apply(lambda x: extract_sizes(x))
df_products['audience'] = df_products['title'].apply(lambda x: extract_audiences(x))

Let's validate the results and display few products with populated fields. 

In [None]:

display(df_products[
    df_products['pattern'].notna() | 
    df_products['size'].notna() | 
    df_products['audience'].notna()
][['title', 'pattern', 'size', 'audience']].head(10))

## Brand extraction

Now you use Vertex AI's generative AI capabilities to extract brand information from product titles. You use the Gemini model to analyze titles and identify potential brands.

In [None]:
from google import genai
from pydantic import BaseModel
import json
import time


from tqdm import tqdm



def extract_brands_with_ai(titles_batch, suggested_brands):
    """
    Extract brands from a batch of product titles using generative AI,
    leveraging a list of suggested brands from prior tokenization.
    """
    
    # Create prompt for brand extraction
    prompt = f"""
    You are an expert in e-commerce product data analysis. Your task is to extract brand names from product titles.

    Consider these potential brand names, which were identified during an initial analysis of product titles. Use them as strong suggestions:
    {json.dumps(suggested_brands)}

    Rules:
    1. Extract only actual brand names, not product types or categories.
    2. Use the provided list of potential brand names as strong suggestions. However, also identify other brands if they are clearly present in the title but not on the suggestion list.
    3. Return results as a JSON map (NOT array of maps) where the key is the original title and the value is an array of identified brand names (e.g., {{"title": ["BrandA", "BrandB"]}}).
    4. If a title contains a single brand, return that brand as a single element within an array (e.g., {{"title": ["BrandA"]}}).
    5. If no brand is identifiable in a title, return "Unknown" as a single element in an array for that title (e.g., {{"title": ["Unknown"]}}).
    6. If a title contains multiple brands, return all identified brands in an array, in the order they appear in the title (e.g., {{"title": ["BrandX", "BrandY"]}}).
    7. Be consistent with brand name formatting. If a brand matches one from the suggested list, prefer the casing from that list. Otherwise, use standard capitalization (e.g., "Google", not "google").
    8. Ignore generic terms, product types, or attributes that are not brand names.
    
    Product titles to analyze:
    {chr(10).join([f"{i+1}. {title}" for i, title in enumerate(titles_batch)])}
    """
    
    try:
        # print(prompt) # Uncomment for debugging the prompt
        # Call the gemini_run_query function
        brands_map = gemini_run_query(
            contents=prompt,
        )
        # print(f"brands: {brands_map}") # Uncomment for debugging the raw response from Gemini
        if brands_map is None: # Error occurred in gemini_run_query
            # gemini_run_query already prints a detailed error.
            # This function needs to return the specific error structure it promised.
            print(f"AI query failed for titles starting with '{titles_batch[0] if titles_batch else 'N/A'}'.")
            return {title: ["Error"] for title in titles_batch}
            
        return brands_map
    except Exception as e: # Catch any other unexpected errors during the process
        print(f"Unexpected error in extract_brands_with_ai for titles starting with '{titles_batch[0] if titles_batch else 'N/A'}': {e}")
        return {title: ["Error"] for title in titles_batch}

# Process titles in batches
print("=== Extracting Brands using generative AI ===")

# Prepare suggested brands from earlier tokenization (cell 18)
suggested_brands_from_tokenization = []
if 'category_token_counts' in globals() and isinstance(category_token_counts, dict) and 'brands' in category_token_counts:
    suggested_brands_from_tokenization = list(category_token_counts['brands'].keys())
    print(f"Using {len(suggested_brands_from_tokenization)} suggested brands from tokenization analysis.")
    # Optional: Display a sample of suggested brands
    # print(f"Sample of suggested brands: {suggested_brands_from_tokenization[:10]}")
else:
    print("Warning: 'category_token_counts' not found or 'brands' key missing. Proceeding without brand suggestions for the AI.")


batch_size = 200  # Small batch size to avoid API limits
final_merged_brands_map = {} # To store all brand maps from all batches

# Get unique titles for processing
unique_titles = df_products['title'].unique().tolist()
print(f"Processing {len(unique_titles)} unique product titles in batches of {batch_size}...")

for i in tqdm(range(0, len(unique_titles), batch_size)):
    print(f"Processing batch {i // batch_size + 1}...")
    current_batch_titles = unique_titles[i:i+batch_size]
    
    # extract_brands_with_ai is expected to return a dict: {title: [brands], ...}
    brands_data_for_batch = extract_brands_with_ai(current_batch_titles, suggested_brands_from_tokenization)
    
    # Update the main dictionary with data from the current batch
    if isinstance(brands_data_for_batch, dict):
        final_merged_brands_map.update(brands_data_for_batch)
    else:
        # Fallback if extract_brands_with_ai doesn't return a dict
        print(f"Warning: AI processing for batch starting with '{current_batch_titles[0] if current_batch_titles else 'N/A'}' did not return a dictionary. These titles may be missing brand data or have errors.")
        for title_in_batch in current_batch_titles:
            final_merged_brands_map.setdefault(title_in_batch, ["Error - Batch Data Invalid"])
    
    time.sleep(1)


# Assign the extracted brands to the DataFrame
# Map titles to their extracted brand lists; titles not in map get NaN
df_products['brand'] = df_products['title'].map(final_merged_brands_map)

# Ensure that the 'brand' column contains lists.
# If map resulted in NaN (title not found) or if value is not a list, set to ["Unknown"].
df_products['brand'] = df_products['brand'].apply(
    lambda x: x if isinstance(x, list) else ["Unknown"]
)
print("Brand extraction completed!")


Let's display the 'title' and the extracted 'brand' for the first 10 products in the `df_products` DataFrame to show the results of the brand extraction process.

In [None]:
display(df_products[['title', 'brand']].head(10))

## Description generation
Next, you generate product descriptions using the Gemini model. This helps you create compelling and informative descriptions for each product based on its title and other attributes.

In [None]:
from google import genai
import json
import time
from tqdm import tqdm

# Ensure client is initialized (it should be from the previous cell, but good for clarity)
# client = genai.Client(project=project_id, location="us-central1", vertexai=True)

def generate_descriptions_with_ai(titles_batch):
    """
    Generate product descriptions from a batch of product titles using generative AI.
    """
    prompt = f"""
    You are an expert e-commerce copywriter. Your task is to generate compelling and concise product descriptions 
    suitable for a retail portal based on product titles.

    Rules:
    1. For each product title, generate a description that is 2-4 sentences long.
    2. Highlight key features and benefits implied by the title.
    3. Use engaging and persuasive language.
    4. Return results as a JSON map (NOT as an array of maps) where the key is the original product title and the value is the generated product description string.
    5. If a product title is too generic or uninformative to generate a meaningful description, use title as description.
    6. Ensure the output is a valid JSON object.

    Product titles to analyze:
    {chr(10).join([f"{i+1}. {title}" for i, title in enumerate(titles_batch)])}
    """

    try:
        # Call the gemini_run_query function, which handles model selection,
        # JSON response type, and basic error handling.
        descriptions_map = gemini_run_query(contents=prompt)

        if descriptions_map is None: # Error occurred in gemini_run_query
            # gemini_run_query already prints a detailed error.
            # This function needs to return the specific error structure it promised.
            print(f"AI query failed for titles starting with '{titles_batch[0] if titles_batch else 'N/A'}'.")
            return {title: "Error generating description" for title in titles_batch}
            
        return descriptions_map
    except Exception as e: # Catch any other unexpected errors during the process
        print(f"Unexpected error in generate_descriptions_with_ai for titles starting with '{titles_batch[0] if titles_batch else 'N/A'}': {e}")
        return {title: "Error generating description" for title in titles_batch}

# Process titles for descriptions in batches
print("\n=== Generating Product Descriptions using generative AI ===")
batch_size = 100  # Adjust batch size as needed, considering prompt length and API limits for descriptions
final_merged_descriptions_map = {}

# Get unique titles for processing
unique_titles_for_description = df_products['title'].unique().tolist()
print(f"Processing {len(unique_titles_for_description)} unique product titles for descriptions in batches of {batch_size}...")

for i in tqdm(range(0, len(unique_titles_for_description), batch_size)):
    print(f"Processing description batch {i // batch_size + 1}...")
    current_batch_titles = unique_titles_for_description[i:i+batch_size]
    
    descriptions_data_for_batch = generate_descriptions_with_ai(current_batch_titles)
    
    if isinstance(descriptions_data_for_batch, dict):
        final_merged_descriptions_map.update(descriptions_data_for_batch)
    else:
        print(f"Warning: AI processing for description batch starting with '{current_batch_titles[0] if current_batch_titles else 'N/A'}' did not return a dictionary.")
        for title_in_batch in current_batch_titles:
            final_merged_descriptions_map.setdefault(title_in_batch, "Error - Batch Data Invalid")
    
    time.sleep(1) # Respect API rate limits

# Assign the generated descriptions to the DataFrame
df_products['description'] = df_products['title'].map(final_merged_descriptions_map)

# Ensure that the 'description' column contains strings, default if not.
df_products['description'] = df_products['description'].apply(
    lambda x: x if isinstance(x, str) else "Description not available"
)


Let's take a look at the first 10 products in the `df_products` DataFrame to see the generated descriptions alongside their titles.

In [None]:
print("\nProduct description generation completed!")
display(df_products[['title', 'description']].head(10))

## Tag generation
Now, you generate product tags using the Gemini model. Tags are essential for improving searchability and categorization of products in the catalog. You use the model to create relevant tags based on product titles and descriptions. You also incorporate the brand information extracted earlier to ensure tags are relevant and comprehensive.

In [None]:
from google import genai
import json
import time
from tqdm import tqdm
import re # Ensure re is imported

# Ensure client is initialized
# client = genai.Client(project=project_id, location="us-central1", vertexai=True)

def generate_tags_with_ai(batch_data, suggested_tags):
    """
    Generate tags from a batch of product titles, descriptions, and brands using generative AI.
    Brands (provided in batch_data) are removed from the generated tags during post-processing.
    Suggested tags are provided to the AI as a hint.
    Batch data should be a list of dictionaries, each with 'title', 'description', and 'brand'.
    """

    tag_rules = """
      Tag best practices:
      1. Tag Length: The ideal length for a tag is typically 1 to 3 words.

      The goal is to capture the essence of a searchable keyword or a specific attribute. Think about what a user would realistically type into a search bar or what would make sense as a clickable filter in a sidebar.

      Good Examples (1-3 words):
      - organic cotton
      - water-resistant
      - summer collection
      - vintage wash
      - limited edition
      - graphic print


      2. What to Avoid:
      - Too Broad (often 1 word): A tag like "shirt" is often too generic if the product is already in a "Shirts" category. It adds very little new information.
      - Too Long (like a sentence): A tag like "this t-shirt is made from premium sustainable cotton" is not a tag; it's a description. It's ineffective for faceting and won't match user search behavior.
    """
    prompt_parts = [
        "You are an expert ecommerce product tagger. Your task is to generate a list of 5 to 10 relevant tags for each product based on its title and description.",
        "Focus on keywords that customers might use to search for these products.",
        "Tags should be concise and can include product attributes, category, use cases, or key features.",
        tag_rules,
        "Consider these potential tags, which were identified during an initial analysis. Use them as strong suggestions if applicable, but also generate other relevant tags:",
        json.dumps(suggested_tags),
        "Return results as a JSON map where the key is the original product title and the value is an array of generated tag strings.",
        "If a product title or description is too generic or uninformative to generate meaningful tags, return an empty array for that title.",
        "Ensure the output is a valid JSON object.",
        "\nProduct data to analyze:"
    ]
    
    for i, item in enumerate(batch_data):
        prompt_parts.append(f"{i+1}. Title: {item['title']}\n   Description: {item['description']}")
        
    prompt = "\n".join(prompt_parts)

    try:
        raw_tags_map = gemini_run_query(
                    contents=prompt,
        )

        # Post-process to remove brands from tags
        final_tags_map = {}
        for item_in_batch in batch_data: # Iterate through the input batch_data to get brand info
            title = item_in_batch['title']
            product_brands = item_in_batch.get('brand', []) 
            
            if not isinstance(product_brands, list):
                product_brands = [] 

            # Filter out "Unknown" or "Error" brands, convert to lowercase, and remove empty strings
            valid_brands_lower = [
                b.lower().strip() for b in product_brands 
                if isinstance(b, str) and b.lower().strip() not in ["unknown", "error", ""]
            ]
            # Sort by length descending to remove longer brand names first (e.g., "google pixel" before "google")
            valid_brands_lower.sort(key=len, reverse=True)
            
            ai_generated_tags = raw_tags_map.get(title, [])
            if not isinstance(ai_generated_tags, list): # Ensure AI output for title is a list
                ai_generated_tags = []

            cleaned_product_tags = set() # Use set to avoid duplicate tags after cleaning
            for tag_str in ai_generated_tags:
                if not isinstance(tag_str, str) or not tag_str.strip(): # Skip if tag is not a string or empty
                    continue
                
                modified_tag_str = tag_str.lower() # Work with lowercase tag

                for brand_lower in valid_brands_lower:
                    if not brand_lower: # Skip empty brand strings from the sorted list
                        continue
                    # Remove brand, ensuring whole word match. re.escape handles special chars in brand names.
                    modified_tag_str = re.sub(f'\b{re.escape(brand_lower)}\b', '', modified_tag_str, flags=re.IGNORECASE)
                
                # Clean up extra spaces that might result from removal and normalize multiple spaces to one
                modified_tag_str = ' '.join(modified_tag_str.split()).strip()

                if modified_tag_str: # Add tag only if it's not empty after brand removal
                    cleaned_product_tags.add(modified_tag_str)
            
            final_tags_map[title] = sorted(list(cleaned_product_tags)) # Store unique, sorted tags

        return final_tags_map

    except Exception as e:
        print(f"Error generating tags for batch starting with '{batch_data[0]['title'] if batch_data else 'N/A'}': {e}")
        # Return a map with empty tag lists for all items in the batch in case of error
        return {item['title']: [] for item in batch_data}

# Prepare data for tag generation: unique title-description-brand sets
# Ensure 'brand' column exists from previous brand extraction cell
# Use drop_duplicates on title, assuming brand and description are dependent on title for uniqueness in this context.
df_unique_products_for_tags = df_products[['title', 'description', 'brand']].drop_duplicates(subset=['title']).reset_index(drop=True)

print("\n=== Generating Product Tags using generative AI (brands will be excluded) ===")

# Prepare suggested tags from earlier tokenization (cell 18)
suggested_tags_from_tokenization = []
if 'category_token_counts' in globals() and isinstance(category_token_counts, dict) and 'tags' in category_token_counts:
    suggested_tags_from_tokenization = list(category_token_counts['tags'].keys())
    print(f"Using {len(suggested_tags_from_tokenization)} suggested tags from tokenization analysis.")
    # Optional: Display a sample of suggested tags
    # print(f"Sample of suggested tags: {suggested_tags_from_tokenization[:20]}")
else:
    print("Warning: 'category_token_counts' not found or 'tags' key missing. Proceeding without tag suggestions for the AI.")


batch_size_tags = 100  # Adjust based on typical length of title + description and token limits
final_merged_tags_map = {}

print(f"Processing {len(df_unique_products_for_tags)} unique title/description/brand combinations for tags in batches of {batch_size_tags}...")

for i in tqdm(range(0, len(df_unique_products_for_tags), batch_size_tags)):
    print(f"Processing tags batch {i // batch_size_tags + 1}...")
    current_batch_data = []
    for idx, row in df_unique_products_for_tags.iloc[i:i+batch_size_tags].iterrows():
        # Include brand in the data passed to the AI function for post-processing
        current_batch_data.append({'title': row['title'], 
                                   'description': row['description'], 
                                   'brand': row['brand']}) # Add brand here
    
    if not current_batch_data:
        continue
        
    # Pass suggested_tags_from_tokenization to the function
    tags_data_for_batch = generate_tags_with_ai(current_batch_data, suggested_tags_from_tokenization)
    
    if isinstance(tags_data_for_batch, dict):
        final_merged_tags_map.update(tags_data_for_batch)
    else:
        print(f"Warning: AI processing for tags batch did not return a dictionary.")
        # Default to empty list for titles in this problematic batch
        for item in current_batch_data:
            final_merged_tags_map.setdefault(item['title'], []) 
    
    time.sleep(1) # Respect API rate limits

# Assign the generated tags to the DataFrame
df_products['tags'] = df_products['title'].map(final_merged_tags_map)

# Ensure that the 'tags' column contains lists, default to empty list if not found or not a list.
df_products['tags'] = df_products['tags'].apply(
    lambda x: x if isinstance(x, list) else []
)


Let's check few products to see how `tags` were generated based on the product titles and descriptions. 

In [None]:

print("\nProduct tag generation completed! (Brands excluded from tags)")
display(df_products[['title', 'tags']].head(10))

Let's analyze the results to see how well the model performs in identifying brands from product titles. Look for patterns in the extracted brand names and assess the model's accuracy.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

print("\n=== Analyzing Product Tags ===")

# Ensure all tags are strings and convert to lowercase
df_products['tags'] = df_products['tags'].apply(
    lambda tag_list: [str(tag).lower() for tag in tag_list if isinstance(tag, str) or not pd.isna(tag)] if isinstance(tag_list, list) else []
)

# Explode the tags list into separate rows for individual tag analysis
all_tags = df_products['tags'].explode().dropna() # dropna to remove any NaN from explode if a product had an empty list initially

# Calculate tag frequencies
tag_counts = all_tags.value_counts()

print(f"Total unique tags (after lowercase): {len(tag_counts)}")
print("\nTop 20 most common tags:")
display(tag_counts.head(20))


Let's see if you can generalize some tags to avoid 'tag swamp' and improve searchability. Consolidate tags by grouping similar ones together, ensuring that the final set of tags is both comprehensive and relevant.

In [None]:
from google import genai
import json
import time
from tqdm import tqdm
import pandas as pd

# Ensure client is initialized (it should be from a previous cell)
# client = genai.Client(project=project_id, location="us-central1", vertexai=True)

def consolidate_tags_with_ai(tags_batch):
    """
    Consolidate a batch of tags using generative AI.
    """
    prompt = f"""
    You are an expert data normalizer specializing in e-commerce product tags.
    Your task is to consolidate a list of product tags to reduce variability and group semantically similar tags under a single canonical form.
    Aim to reduce the overall number of unique tags significantly, towards a target of 50-100 unique tags.

    Rules:
    1. Analyze the provided list of tags.
    2. Identify groups of tags that are synonyms, misspellings, plural/singular variations, or closely related concepts.
    3. For each group, suggest a single, concise, and representative canonical tag.
    4. If a tag is already in a good canonical form or is unique and distinct, it can map to itself.
    5. Prioritize broader terms if multiple specific terms can be grouped (e.g., "hdmi cable," "usb cable" could remain distinct or be grouped under "cables" if very aggressive consolidation is needed, but for now, try to keep meaningful distinctions unless they are very minor variations).
    6. Ensure the canonical tag is in lowercase.
    7. Remove the tags that are too generic or not useful for search (e.g., "product", "item", "thing", "location") unless they are part of a meaningful tag.
    8. Try to generate tags that are non-overlapping and distinct, avoiding redundancy.
    9. When you generate an initial list of canonical tags, ensure they are distinct and meaningful, perform another iteration of analysis based on the rules above, to avoid redundancy.
    10. Return the results as a JSON map where the key is the original tag from the input batch and the value is its suggested canonical tag.

    Example:
    Input Tags: ["running shoe", "Running Shoes", "runningshoe", "sneaker", "sporty shoe", "athletic footwear", "trainer"]
    Output JSON Map: {{
        "running shoe": "running shoe",
        "Running Shoes": "running shoe",
        "runningshoe": "running shoe",
        "runner": "running shoe",
        "white sneakers": "sneaker"
        "sneaker": "sneaker",
        "sporty shoe": "sneaker",
        "athletic footwear": "athletic shoe",
        "trainer shoes": "athletic shoe"
    }}
    (Note: This example is illustrative. Your grouping might be different based on the overall list and desired consolidation level.)

    Tags to consolidate:
    {json.dumps(tags_batch)}
    """
    try:
        consolidation_map = gemini_run_query(
            contents=prompt,
        )
        return consolidation_map
    except Exception as e:
        print(f"Error consolidating tags for batch: {e}")
        # Fallback: return a map where each tag maps to itself
        return {tag: tag for tag in tags_batch}

print("\n=== Consolidating Product Tags using generative AI ===")

# Ensure tags are lowercase first (assuming previous cell was run)
df_products['tags'] = df_products['tags'].apply(
    lambda tag_list: [str(tag).lower() for tag in tag_list if isinstance(tag, str) or not pd.isna(tag)] if isinstance(tag_list, list) else []
)
all_lowercase_tags = df_products['tags'].explode().dropna().unique().tolist()

print(f"Found {len(all_lowercase_tags)} unique lowercase tags to consolidate.")

batch_size_consolidation = 500 # Adjust based on typical number of tags and prompt limits
master_consolidation_map = {}

if all_lowercase_tags:
    for i in tqdm(range(0, len(all_lowercase_tags), batch_size_consolidation)):
        print(f"Processing consolidation batch {i // batch_size_consolidation + 1}...")
        current_tags_batch = all_lowercase_tags[i:i+batch_size_consolidation]
        
        batch_map = consolidate_tags_with_ai(current_tags_batch)
        master_consolidation_map.update(batch_map)
        
        time.sleep(1) # Respect API rate limits
else:
    print("No tags found to consolidate.")

print(f"Master consolidation map created with {len(master_consolidation_map)} entries.")

def apply_tag_consolidation(tag_list, consolidation_map):
    if not isinstance(tag_list, list):
        return []
    consolidated_tags = set() # Use a set to store unique canonical tags for the product
    for tag in tag_list:
        tag_lower = str(tag).lower() # Ensure it's lowercase, though it should be already
        canonical_tag = consolidation_map.get(tag_lower, tag_lower) # Get from map, or use original if not found
        consolidated_tags.add(canonical_tag)
    return sorted(list(consolidated_tags))

# Apply the consolidation map to the 'tags' column
if master_consolidation_map:
    df_products['tags'] = df_products['tags'].apply(lambda x: apply_tag_consolidation(x, master_consolidation_map))
    print("\nProduct tags consolidated.")
    # Calculate and print the number of unique tags after consolidation
    all_consolidated_tags = df_products['tags'].explode().dropna().unique().tolist()
    print(f"Total unique tags after consolidation: {len(all_consolidated_tags)}")
else:
    print("\nNo consolidation map created, tags remain unchanged.")


In [None]:
print("number of tags before consolidation:", len(all_lowercase_tags))
print("number of tags after consolidation:", len(df_products['tags'].explode().dropna().unique().tolist()))

# Calculate and display top 20 consolidated tags with counts
consolidated_tag_counts = df_products['tags'].explode().dropna().value_counts()

# --- New: Side-by-side comparison of top 20 initial and consolidated tags ---
print("\n=== Comparison: Top 20 Initial vs. Consolidated Tags ===")

# Get top 20 initial tags (assuming 'tag_counts' is available from Cell 27)
if 'tag_counts' in globals():
    top_initial_tags = tag_counts.head(20).reset_index()
    top_initial_tags.columns = ['Initial Tag', 'Initial Count']
else:
    print("Warning: 'tag_counts' (initial tags) not found. Skipping initial tags display.")
    # Create an empty DataFrame with expected columns if tag_counts is not available
    top_initial_tags = pd.DataFrame(columns=['Initial Tag', 'Initial Count'])


top_consolidated_tags = consolidated_tag_counts.head(20).reset_index()
top_consolidated_tags.columns = ['Consolidated Tag', 'Consolidated Count']

# Combine for side-by-side display
# Use pd.concat. If the number of rows is different, it will fill with NaN.
# For a cleaner look, ensure both DataFrames have the same number of rows (e.g., by taking head(20) from both)
# or decide on a primary key if a merge is more appropriate (not the case here for simple top N display).

# Reset index for both to allow concatenation by position if lengths are same
# If lengths might differ and you want to align by rank:
df_comparison = pd.concat([top_initial_tags, top_consolidated_tags], axis=1)

# Fill NaN values that might appear if one list is shorter than the other (though both are head(20))
df_comparison = df_comparison.fillna('') # Fill with empty string for display

display(df_comparison)

You did pretty good, significantly reducing the space of tags. Here is a sample of `tags` field after consolidation.

In [None]:
display(df_products[['title', 'tags']].head(10))

## Color attribute extraction
Now, for the grand finale, you extract color attributes from the product titles. You use the Gemini model to identify color families and specific colors associated with each product. This helps you enhance the product catalog with detailed color information. Note that you also could have analyzed the images referred in the `image` field, but for simplicity you focus on text data only.

In [None]:
from google import genai
import json
import time
from tqdm import tqdm
import pandas as pd

# Ensure the genai client is initialized (it should be from a previous cell)
# client = genai.Client(project=project_id, location="us-central1", vertexai=True)

def generate_color_attributes_with_ai(titles_batch):
    """
    Generate colorFamilies and colors from a batch of product titles using generative AI.
    titles_batch: A list of product titles.
    """
    prompt_parts = [
        "You are an expert in e-commerce product data analysis. Your task is to extract color families and specific colors from product information.",
        "Based on the provided product title, identify for each product:", # Changed to title only
        "1.  `colorFamilies`: A list of standard color groups.",
        "    *   Strongly recommended to use only these values: \"Red\", \"Pink\", \"Orange\", \"Yellow\", \"Purple\", \"Green\", \"Cyan\", \"Blue\", \"Brown\", \"White\", \"Gray\", \"Black\", \"Mixed\".",
        "    *   Normally, provide only 1 color family. If multiple distinct color families are clearly present and necessary, list them, but prefer \"Mixed\" if appropriate.",
        "    *   Maximum of 5 values allowed. Each value must be a UTF-8 encoded string, max 128 characters.",
        "    *   If no color is identifiable from the title, return an empty list for this key.", # Clarified source
        "2.  `colors`: A list of color display names (e.g., frontend aliases like \"scarlet red\", \"ocean blue\").",
        "    *   Normally, provide only 1 color. If multiple distinct colors are clearly present and necessary, list them.",
        "    *   Maximum of 75 colors allowed. Each value must be a UTF-8 encoded string, max 128 characters.",
        "    *   If no color is identifiable from the title, return an empty list for this key.", # Clarified source
        "",
        "Return results as a single JSON map where the key is the original product title, and the value is a JSON object with two keys: \"colorFamilies\" (a list of strings) and \"colors\" (a list of strings).",
        "Example for a single product entry in the map:",
        "\"Product Title Example\": { \"colorFamilies\": [\"Red\"], \"colors\": [\"Scarlet Red\", \"Crimson\"] }",
        "If no colors can be determined for a product from its title, its entry should be like:", # Clarified source
        "\"Another Product Title\": { \"colorFamilies\": [], \"colors\": [] }",
        "Ensure the entire output is a valid JSON object (a single map).",
        "\nProduct titles to analyze:" # Changed from "Product data to analyze:"
    ]

    for i, title_str in enumerate(titles_batch):
        prompt_parts.append(f"{i+1}. Title: {title_str}") # Use title_str directly
        
    prompt = "\n".join(prompt_parts)

    try:
        color_attributes_map = gemini_run_query(
            contents=prompt,
        )
        return color_attributes_map
    except Exception as e:
        print(f"Error generating color attributes for batch starting with title '{titles_batch[0] if titles_batch else 'N/A'}': {e}")
        # Return a map with empty color lists for all items in the batch in case of error
        return {title: {"colorFamilies": [], "colors": []} for title in titles_batch}

# Prepare data for color attribute generation
# Use drop_duplicates on title.
# Ensure 'description' column might still be needed for display, but not for AI processing here.
# If 'description' is not in df_products for other reasons, ensure it's handled or remove from display.
if 'description' not in df_products.columns:
    df_products['description'] = "Not available" 

df_unique_titles_for_colors = df_products[['title']].drop_duplicates(subset=['title']).reset_index(drop=True)

print("\n=== Generating Color Attributes (Families and Colors) using generative AI (from Titles only) ===")
batch_size_colors = 200  # Adjust based on typical length of title and token limits
final_merged_color_attributes_map = {}

print(f"Processing {len(df_unique_titles_for_colors)} unique titles for color attributes in batches of {batch_size_colors}...")

for i in tqdm(range(0, len(df_unique_titles_for_colors), batch_size_colors)):
    print(f"Processing color attributes batch {i // batch_size_colors + 1}...")
    current_batch_titles_list = []
    for idx, row in df_unique_titles_for_colors.iloc[i:i+batch_size_colors].iterrows():
        current_batch_titles_list.append(row['title']) # Append only title string
    
    if not current_batch_titles_list:
        continue
        
    batch_color_map = generate_color_attributes_with_ai(current_batch_titles_list)
    
    if isinstance(batch_color_map, dict):
        final_merged_color_attributes_map.update(batch_color_map)
    else:
        print(f"Warning: AI processing for color attributes batch did not return a dictionary for titles starting with '{current_batch_titles_list[0] if current_batch_titles_list else 'N/A'}'.")
        for title_str in current_batch_titles_list:
            final_merged_color_attributes_map.setdefault(title_str, {"colorFamilies": [], "colors": []})
    
    time.sleep(1) # Respect API rate limits

# Assign the generated color attributes to the DataFrame
def get_color_families_from_map(title):
    return final_merged_color_attributes_map.get(title, {}).get('colorFamilies', [])

def get_colors_from_map(title):
    return final_merged_color_attributes_map.get(title, {}).get('colors', [])

df_products['colorFamilies'] = df_products['title'].apply(get_color_families_from_map)
df_products['colors'] = df_products['title'].apply(get_colors_from_map)

# Ensure the new columns are lists
df_products['colorFamilies'] = df_products['colorFamilies'].apply(lambda x: x if isinstance(x, list) else [])
df_products['colors'] = df_products['colors'].apply(lambda x: x if isinstance(x, list) else [])


Let's do a sanity check and look at the first 10 products where `colorInfo` or `colorFamilies` were populated. 

In [None]:

print("\nColor attribute generation completed! (from Titles only)")
display(
    df_products[
        (df_products['colorFamilies'].apply(lambda x: isinstance(x, list) and len(x) > 0)) |
        (df_products['colors'].apply(lambda x: isinstance(x, list) and len(x) > 0))
    ][['title', 'colorFamilies', 'colors']].head(10)
)

## Create and populate enhanced product table

You now take a look at the df_products dataframe, now that you have enriched your product data with additional attributes such as brand, description, tags, and color information. It contains the original product data along with the enriched fields you have generated using generative AI models.

In [None]:
display(df_products.head(50))

You now create a new table in BigQuery to store this enhanced product data. This table will be named `products_enhanced` and will include all the original fields along with the newly generated attributes..

The process involves the following key stages to create and populate the `products_enhanced` table in BigQuery:

1.  **Table cloning for baseline:**
    *   A new table, `products_enhanced`, will be created by cloning the existing `retail.products` table. This will ensure that all original data and the existing schema structure are preserved as a baseline.

2.  **Schema extension for enriched attributes:**
    *   The schema of the newly cloned `products_enhanced` table will be extended. This involves adding new columns required to store the enriched product attributes (such as brands, description, tags, and color information) that are not present in the original table. The complete target schema, inclusive of these new fields, is typically predefined (e.g., loaded from a Google Cloud Storage file).

3.  **Populating extended attributes:**
    *   The newly added columns in the `products_enhanced` table will be populated with the enriched data generated within this notebook. This will be achieved by:
        *   Preparing the enriched data (e.g., from the `df_products` Pandas DataFrame).
        *   Loading this prepared data into a temporary BigQuery table.
        *   Executing a `MERGE` operation to update the `products_enhanced` table, matching records by product ID and filling in the values for the extended attributes.

4.  **Verification and cleanup:**
    *   Post-population, a verification step will be performed by querying a sample of the updated records to ensure data integrity.
    *   Any temporary resources, such as the intermediate table used for the merge, will be cleaned up.

### Table cloning and schema extension

In [None]:
from google.cloud import bigquery
from google.cloud import storage
import json

# Ensure BigQuery client is initialized
client = bigquery.Client(project=project_id)

# Define table names and schema path
TABLE_ENHANCED_ID = f"{project_id}.retail.products_enhanced"
SOURCE_TABLE_ID = f"{project_id}.retail.products"
SCHEMA_GCS_PATH = f"gs://{SCRIPTS_BUCKET}/vaisc-configs/retail_products_schema.json"

print(f"Target table for creation/update: {TABLE_ENHANCED_ID}")
print(f"Source table for copy: {SOURCE_TABLE_ID}")
print(f"Schema GCS path for adding columns: {SCHEMA_GCS_PATH}")

def bq_schema_field_to_ddl_type(field: bigquery.SchemaField) -> str:
    """Converts a BigQuery SchemaField to its DDL type string."""
    # Base type determination
    if field.field_type == "RECORD": # BigQuery client uses "RECORD" for STRUCTs
        if not field.fields: # Handle empty STRUCT if it can occur
            return "STRUCT<>"
        subfields_ddl = ", ".join([f"`{sf.name}` {bq_schema_field_to_ddl_type(sf)}" for sf in field.fields])
        base_ddl_type = f"STRUCT<{subfields_ddl}>"
    else: # Simple types like STRING, INTEGER, FLOAT64, BOOLEAN, TIMESTAMP, DATE etc.
        base_ddl_type = field.field_type

    # Apply ARRAY if mode is REPEATED
    if field.mode == "REPEATED":
        return f"ARRAY<{base_ddl_type}>"
    else:
        return base_ddl_type

try:
    # 1. Delete the enhanced table if it exists
    client.delete_table(TABLE_ENHANCED_ID, not_found_ok=True)
    print(f"Deleted table {TABLE_ENHANCED_ID} if it existed.")

    # 2. Create the enhanced table as a copy of the source table
    print(f"Executing: copying {SOURCE_TABLE_ID} to {TABLE_ENHANCED_ID}...")
    copy_job = client.copy_table(SOURCE_TABLE_ID, TABLE_ENHANCED_ID)
    copy_job.result()  # Wait for the job to complete
    print(f"Successfully created {TABLE_ENHANCED_ID} as a copy of {SOURCE_TABLE_ID}.")

    # 3. Load target schema from Google Cloud Storage
    gcs_schema_fields = []
    try:
        storage_client = storage.Client(project=project_id)
        bucket_name, blob_name = SCHEMA_GCS_PATH.replace("gs://", "").split("/", 1)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        schema_json_string = blob.download_as_string()
        schema_list_of_dicts = json.loads(schema_json_string)
        gcs_schema_fields = [bigquery.SchemaField.from_api_repr(field_dict) for field_dict in schema_list_of_dicts]
        print(f"Successfully loaded target schema with {len(gcs_schema_fields)} fields from {SCHEMA_GCS_PATH}")
    except Exception as e:
        print(f"Error loading schema from Google Cloud Storage: {e}. Cannot proceed with adding columns.") 
        raise # Re-raise to stop execution if schema loading fails

    if not gcs_schema_fields:
        print("Google Cloud Storage Schema was not loaded or is empty, cannot proceed with adding columns.") 
    else:
               # 4. Get current schema of the newly created table
        current_table = client.get_table(TABLE_ENHANCED_ID)
        current_field_names = {field.name for field in current_table.schema}
        print(f"Current fields in {TABLE_ENHANCED_ID} after copy: {current_field_names}")

        # 5. Identify and add missing columns
        added_columns_count = 0
        alter_statements = []
        for gcs_field in gcs_schema_fields:
            if gcs_field.name not in current_field_names:
                column_name = gcs_field.name
                column_type_ddl = bq_schema_field_to_ddl_type(gcs_field)
                
                alter_sql = f"ALTER TABLE `{TABLE_ENHANCED_ID}` ADD COLUMN `{column_name}` {column_type_ddl}"
                alter_statements.append(alter_sql)
                print(f"  Will add column: `{column_name}` with type {column_type_ddl}")

        if alter_statements:
            print(f"\nApplying {len(alter_statements)} schema alterations...")
            for i, alter_sql in enumerate(alter_statements):
                try:
                    print(f"Executing ALTER statement {i+1}/{len(alter_statements)}: {alter_sql}")
                    alter_job = client.query(alter_sql)
                    alter_job.result()  # Wait for completion
                    print(f"Successfully executed: {alter_sql}")
                    added_columns_count += 1
                except Exception as e:
                    print(f"Error executing ALTER statement '{alter_sql}': {e}")
                    # Optionally, decide if to stop or continue. For now, it continues.
            print(f"Finished applying schema alterations. {added_columns_count} columns potentially added.")
        else:
            print("No new columns to add. Schema of copied table already contains all columns defined in Google Cloud Storage schema (by name).")

        # Verify final schema
        final_table = client.get_table(TABLE_ENHANCED_ID)
        final_field_names = {field.name for field in final_table.schema}
        print(f"Final fields in {TABLE_ENHANCED_ID}: {final_field_names}")

except Exception as e:
    print(f"An error occurred during table creation or schema update: {e}")



### Data population with extended attributes

In [None]:

# Display the column names of the df_products DataFrame
print(df_products.columns)
display(df_products.head(100))

In [None]:
import pandas as pd
from google.cloud import bigquery

# Ensure BigQuery client is initialized (it should be from previous cells)
client = bigquery.Client(project=project_id)

# 1. Prepare DataFrame for BigQuery load
# Select relevant columns and rename to match BQ schema
df_update = df_products[['title', 'description', 'brand', 'tags', 'colorFamilies', 'colors', 'size', 'pattern', 'audience']].copy()
df_update.rename(columns={
    'brand': 'brands',
    'pattern': 'patterns',
    'size': 'sizes'
}, inplace=True)

# Ensure list columns are not NaN, replace with empty list if so
# Convert comma-separated string columns ('patterns', 'sizes') to lists
def string_to_list(s):
    if pd.isna(s) or s == '':
        return []
    return [item.strip() for item in s.split(',')]

df_update['patterns'] = df_update['patterns'].apply(string_to_list)
df_update['sizes'] = df_update['sizes'].apply(string_to_list)

# Ensure other list columns are indeed lists and handle NaNs
for col in ['brands', 'tags', 'colorFamilies', 'colors']:
    df_update[col] = df_update[col].apply(lambda x: x if isinstance(x, list) else [])

# Ensure 'audience' is a dict, replace None/NaN with default dict structure for BQ STRUCT
def ensure_audience_structure(aud):
    if pd.isna(aud) or not isinstance(aud, dict):
        return {"genders": [], "ageGroups": []}
    # Ensure keys exist
    if 'genders' not in aud or not isinstance(aud['genders'], list):
        aud['genders'] = []
    if 'ageGroups' not in aud or not isinstance(aud['ageGroups'], list):
        aud['ageGroups'] = []
    return aud

df_update['audience'] = df_update['audience'].apply(ensure_audience_structure)


print(f"Prepared DataFrame with {len(df_update)} records for update.")
display(df_update)

# Define temporary table ID for loading data
TEMP_TABLE_ID = f"{project_id}.retail.products_temp_update"

# 2. Load DataFrame to a temporary BigQuery table
try:
    print(f"Loading data into temporary table: {TEMP_TABLE_ID}...")
    # Delete temp table if it exists from a previous run
    client.delete_table(TEMP_TABLE_ID, not_found_ok=True)
    print(f"Deleted temporary table {TEMP_TABLE_ID} if it existed.")

    job_config = bigquery.LoadJobConfig(
        autodetect=True # BQ can infer schema for DataFrames, including dicts as STRUCTs
    )
    load_job = client.load_table_from_dataframe(
        df_update, TEMP_TABLE_ID, job_config=job_config
    )
    load_job.result()  # Wait for the job to complete
    print(f"Successfully loaded {load_job.output_rows} rows into {TEMP_TABLE_ID}.")

    # 3. Execute MERGE statement to update the target table
    merge_query = f"""
    MERGE `{TABLE_ENHANCED_ID}` T
    USING `{TEMP_TABLE_ID}` S
    ON T.title = S.title
    WHEN MATCHED THEN
        UPDATE SET
            T.description = S.description,
            T.brands = S.brands,
            T.tags = S.tags,
            T.colorInfo = STRUCT(
                S.colorFamilies AS colorFamilies, 
                S.colors AS colors
            ),
            T.patterns = S.patterns,
            T.sizes = S.sizes,
            T.audience = STRUCT(
                S.audience.genders AS genders,
                S.audience.ageGroups AS ageGroups
            )
    """
    print("\nExecuting MERGE statement...")
    print(merge_query)
    merge_job = client.query(merge_query)
    merge_job.result()  # Wait for the job to complete
    print(f"MERGE statement completed. {merge_job.num_dml_affected_rows} rows affected in {TABLE_ENHANCED_ID}.")

finally:
    # 4. Clean up the temporary table
    client.delete_table(TEMP_TABLE_ID, not_found_ok=True)
    print(f"\nTemporary table {TEMP_TABLE_ID} deleted.")


Let's perform a final sanity check and look at the first 100 products loaded from the BigQuery table `products_enhanced` where at least one of the enriched fields is populated. 

In [None]:

# 5. Verification query
print("\nFetching a sample of updated records from products_enhanced...")
verification_query = f"""
SELECT 
    id, 
    description, 
    brands, 
    tags,
    colorInfo.colorFamilies AS colorFamilies,
    colorInfo.colors AS colors,
    patterns,
    sizes,
    audience.genders AS audience_genders,
    audience.ageGroups AS audience_ageGroups
FROM `{TABLE_ENHANCED_ID}`
WHERE description IS NOT NULL 
    OR ARRAY_LENGTH(brands) > 0 
    OR ARRAY_LENGTH(tags) > 0 
    OR ARRAY_LENGTH(colorInfo.colorFamilies) > 0 
    OR ARRAY_LENGTH(colorInfo.colors) > 0
    OR ARRAY_LENGTH(patterns) > 0
    OR ARRAY_LENGTH(sizes) > 0
    OR ARRAY_LENGTH(audience.genders) > 0
    OR ARRAY_LENGTH(audience.ageGroups) > 0
LIMIT 100
"""
df_verified = client.query(verification_query).to_dataframe()
display(df_verified)

# Verification of data enriched catalog
Now that you have populated the `products_enhanced` table with enriched product data, it is crucial to verify the integrity and accuracy of the enriched attributes. This step ensures that the data meets your expectations and is ready for use in search and recommendation systems.
Use the Vertex AI Search for commerce validation tool to perform this verification. The validation tool allows you to check the consistency and correctness of the enriched data against the expected schema and values.

## Data loading
You now load the enriched product data from the `products_enhanced` table into the Vertex AI Search for commerce catalog. For testing purposes, you use Branch 2 of the catalog. This allows you to validate the enriched data without affecting the production catalog.

You use a Python script to load the data. You can also use the [Vertex AI Search for commerce console](https://console.cloud.google.com/ai/retail/catalogs/default_catalog/data/catalog) to load the data, but using a Python script allows you to automate the process and easily repeat it in the future.

In [None]:
from google.cloud import retail_v2
from google.cloud.retail_v2.types import ImportProductsRequest, ProductInputConfig, BigQuerySource, ImportErrorsConfig
from google.cloud import storage
import json
import time

# Define Branch 2 for testing enriched data
BRANCH_2 = f"projects/{project_id}/locations/global/catalogs/default_catalog/branches/2"
ENHANCED_TABLE_FULL_ID = f"{project_id}.retail.products_enhanced"

print(f"Loading enriched data from {ENHANCED_TABLE_FULL_ID} into Branch 2: {BRANCH_2}")

def load_products_to_branch_from_bq(branch_name, table_id, reconciliation_mode="FULL"):
    """
    Load products from BigQuery table into a specific catalog branch.
    
    Args:
        branch_name: Target branch (e.g., "projects/.../branches/2")
        table_id: BigQuery table ID (e.g., "products_enhanced" or "retail.products_enhanced")
        reconciliation_mode: "INCREMENTAL" or "FULL"
    """
    # Initialize the ProductService client
    client = retail_v2.ProductServiceClient()

    # Parse dataset_id and table_id_short from the full table_id string if needed
    # If table_id is in the form "project.dataset.table", split it
    # project_id is assumed to be the global project_id for the BQ source
    if "." in table_id:
        parts = table_id.split(".")
        if len(parts) == 3: # project.dataset.table
            _project_id_from_table_str, dataset_id, table_id_short = parts
        elif len(parts) == 2: # dataset.table
            dataset_id, table_id_short = parts
        else: # table (assuming default dataset 'retail')
            dataset_id = "retail" 
            table_id_short = table_id
    else: # table (assuming default dataset 'retail')
        dataset_id = "retail"
        table_id_short = table_id

    bq_source = BigQuerySource(
        project_id=project_id, # Use the global project_id for the BQ source
        dataset_id=dataset_id,
        table_id=table_id_short,
        data_schema="product" # Specifies that the BQ table schema matches the Retail API product schema
    )

    input_config = ProductInputConfig(
        big_query_source=bq_source
    )

    errors_config = ImportErrorsConfig(
        gcs_prefix=f"gs://{SCRIPTS_BUCKET}/retail_import_errors/" # GCS path for error logs
    )

    request = ImportProductsRequest(
        parent=branch_name,
        input_config=input_config,
        errors_config=errors_config,
        reconciliation_mode=reconciliation_mode,
        notification_pubsub_topic=None # Set to a Pub/Sub topic for notifications if needed
    )

    try:
        print(f"Starting import operation for branch: {branch_name}")
        print(f"Source table: {project_id}.{dataset_id}.{table_id_short}")
        print(f"Reconciliation mode: {reconciliation_mode}")
        operation = client.import_products(request=request)
        print(f"Import operation started. Operation name: {operation.operation.name}")
        print("Waiting for import operation to complete (timeout: 1800s)...")
        result = operation.result(timeout=1800) # Wait for the LRO to complete
        print("Import operation completed successfully!")
        print(f"Operation result: {result}")
        return operation.operation.name, result
    except Exception as e:
        print(f"Error during import operation: {e}")
        return None, None

def check_import_operation_status(operation_name):
    """
    Check the status of an import operation.
    """
    try:
        from google.cloud import retail_v2
        client = retail_v2.ProductServiceClient()
        # The operations_client is part of the ProductServiceClient's transport layer
        operation = client.transport.operations_client.get_operation(
            name=operation_name
        )
        print(f"Operation status: {operation}")
        return operation
    except Exception as e:
        print(f"Error checking operation status: {e}")
        return None

# Execute the data loading
print("=== Loading Enhanced Product Data to Branch 2 ===")

# Load data with INCREMENTAL mode. Change to "FULL" for a full replacement if needed.
# The table_id here refers to the table name within the 'retail' dataset of the current project_id.
operation_name, result = load_products_to_branch_from_bq(
    branch_name=BRANCH_2,
    table_id="products_enhanced", # This will be resolved to project_id.retail.products_enhanced
    reconciliation_mode="INCREMENTAL"
)

if operation_name:
    print(f"\nImport operation to Branch 2 initiated successfully.")
    print(f"Operation name: {operation_name}")
    if result: # Result is available if operation completed synchronously or LRO finished
        print(f"Import result details: {result}")
else:
    print("Import operation to Branch 2 failed to initiate or complete. Please check the error messages above.")

# Optional: Check operation status again if needed, especially if it was a long-running operation.
if operation_name:
    print("\n=== Checking Final Operation Status (if LRO was still pending) ===")
    final_status = check_import_operation_status(operation_name)

## Validation
Now that you have loaded the enriched product data into the Vertex AI Search for commerce catalog, you can proceed with the validation process. You use both automated and manual validation techniques. The validation tool helps you ensure that the enriched data meets the expected schema and values, and is ready for use in search and recommendation systems.

First, you need to identify the products that you have enriched and loaded into the catalog. Compare the results of catalog search performed against Branch 0 (original product data) and Branch 2 (enriched product data). You then identify search strings for relevant titles for you to perform the search with.

In [None]:
# Display 50 records from df_update where any extended attributes are not empty
non_empty_mask = (
    df_update['tags'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['patterns'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['sizes'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['brands'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['colorFamilies'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['colors'].apply(lambda x: isinstance(x, list) and len(x) > 0) |
    df_update['audience'].apply(lambda x: isinstance(x, dict) and (x.get('genders') or x.get('ageGroups')) and (len(x.get('genders', [])) > 0 or len(x.get('ageGroups', [])) > 0))
)
display(df_update[non_empty_mask].head(50))

While your results may very, for the products in this lab, you might find that the following search strings return the enriched products:
- "tote"
- "bag"
- "android phone"

To have visually appealing results, use the [Vertex AI Search for commerce console](https://console.cloud.google.com/ai/retail/catalogs/default_catalog/evaluate/search) to perform the search. You can also use the Python script to perform the search, but using the console will allow you to easily visualize the results and compare them.

- Navigate to the [Vertex AI Search for commerce console](https://console.cloud.google.com/ai/retail/catalogs/default_catalog/evaluate/search)
- Select `Branch 2` from the dropdown `Select catalog branch`
- Enter the search string in the `Search query` field, use one of the search strings identified above
- Click the `Search` button

You should see the enriched products in the search results. To validate the advanced search attributes have been populated correctly, `select all` under `Facets` drop-down control. As you will remember, you enabled site-wide controls to make all attributes are searchable, indexable and retrievable. Therefore, facets will be populated with the values from the enriched product data.

You should see something like this in the search results:
![Enhanced search results](https://raw.githubusercontent.com/volenin/vaisc-csb/refs/heads/main/notebooks/img/data_management_enchanced_results.png)

Now run the same same search against Branch 0 (original product data) and compare the results. You should see that the enriched products are not present in the search results for Branch 0, as they were not loaded into that branch.

![Basic search results](https://raw.githubusercontent.com/volenin/vaisc-csb/refs/heads/main/notebooks/img/data_management_basic_results.png)


# Conclusion
In this lab, you successfully demonstrated how to manage and enrich product data using Google Cloud services, particularly focusing on Vertex AI and BigQuery. You covered the following key steps:
-   **Data analysis**: You analyzed the existing product data to identify gaps and opportunities for enrichment.
-   **Data enrichment**: You utilized generative AI models to extract structured information from product titles, generating attributes such as brand names, descriptions, tags, and color information.
-   **Data population**: You created a new BigQuery table to store the enriched product data, ensuring that all original fields and newly generated attributes were included.
-   **Data validation**: You validated the enriched product data using the Vertex AI Search for commerce validation tool, ensuring that the data met your expectations and was ready for use in search and recommendation systems.

**Areas for improvement**:
-   **Image analysis**: In this lab, you focused on text data for simplicity. Future iterations could incorporate image analysis to extract additional attributes from product images, enhancing the catalog further.
-   **Attributes cleansing**: While you consolidated tags and extracted colors, further cleansing and normalization of attributes could improve data quality. For example, ensuring consistent naming conventions for brands and colors, standardizing sizing, genders, and audience attributes.

**Operationalization**:
In this lab, you demonstrated an approach to improve product data quality and searchability in an iterative manner. The steps you followed can be operationalized in a production environment. One of the easiest ways to do this is to use the [BigQuery Pipelines](https://cloud.google.com/bigquery/docs/pipelines-introduction), as it supports direct execution of notebooks. Another approach is to transform the notebook into a Python script or PySpark jobs that can be scheduled to run periodically through Cloud Scheduler or BigQuery scheduled queries.

**Role of generative AI**:
Generative AI played a crucial role in this lab by enabling you to extract structured information from unstructured text data. Not only that, the entire codebase in this notebook was generated using Gemini model, which demonstrates the power of generative AI in automating and enhancing data management tasks. The code operationalization can also be done with help of generative AI.
