# Extract Product Metadata

In [1]:
# CATEGORY = "Baby_Products"
CATEGORY = "Video_Games"

# Define sequence lengths
MIN_SEQUENCE_LENGTH = 3
MAX_SEQUENCE_LENGTH = 100  # Adjust as needed

In [2]:
import sys
from pathlib import Path

NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent
sys.path.append(str(PROJECT_ROOT))

# Data directory
DATA_DIR = Path(PROJECT_ROOT, "data")
DATA_DIR.mkdir(exist_ok=True)

In [3]:
import json
import logging
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

import polars as pl
from dotenv import load_dotenv
from google import genai
from IPython.display import display
from tqdm.notebook import tqdm

from src.logger import setup_logger

# Set environment variables to reduce Google API verbosity
os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"

logger = setup_logger("clean-items")
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("google_genai").setLevel(logging.WARNING)

In [4]:
load_dotenv()

True

In [5]:
# API settings
MODEL = "gemini-2.5-flash-lite"
# MODEL = "gemini-2.5-flash"
BATCH_SIZE: int = 10  # Items per API call (can process more titles at once)
MAX_RETRIES: int = 5
RETRY_DELAY: float = 1.0  # Seconds
RATE_LIMIT_DELAY: float = 0.1  # Seconds between API calls (faster for shorter text)
MAX_THREADS: int = 15  # Can use more threads for shorter text

# Text processing settings for titles
MAX_TITLE_LENGTH: int = 200  # Truncate titles before sending to API
MIN_TITLE_LENGTH: int = 10  # Minimum length to consider valid

## 2. Load Data and Analyze Titles

In [6]:
# LOAD ORIGINAL METADATA
metadata_output_path = DATA_DIR / "output" / f"{CATEGORY}_items.parquet"
item_df = pl.read_parquet(metadata_output_path)
logger.info(f"Loaded {len(item_df):,} items from {metadata_output_path}")

# LOAD CLEANED DESCRIPTIONS
cleaned_descriptions_path = DATA_DIR / "output" / f"{CATEGORY}_descriptions_clean.csv"
cleaned_descriptions_df = pl.read_csv(cleaned_descriptions_path)
cleaned_descriptions_df = cleaned_descriptions_df.unique(subset=["parent_asin"])
cleaned_descriptions_df = cleaned_descriptions_df.filter(pl.col("clean_description") != "NA")
logger.info(f"Loaded {len(cleaned_descriptions_df):,} cleaned descriptions from {cleaned_descriptions_path}")

# LOAD CLEANED TITLES
cleaned_titles_path = DATA_DIR / "output" / f"{CATEGORY}_titles_clean.csv"
cleaned_titles_df = pl.read_csv(cleaned_titles_path)
cleaned_titles_df = cleaned_titles_df.unique(subset=["parent_asin"])
cleaned_titles_df = cleaned_titles_df.filter(pl.col("clean_title") != "NA")
logger.info(f"Loaded {len(cleaned_titles_df):,} cleaned titles from {cleaned_titles_path}")

# MERGE DATAFRAMES
item_df = item_df.join(cleaned_descriptions_df, on="parent_asin", how="inner")
item_df = item_df.join(cleaned_titles_df, on="parent_asin", how="inner")
logger.info(f"Merged {len(item_df):,} items with cleaned descriptions and titles")

10:06:31 - Loaded 66,133 items from /home/zihao/llm/semantic-ids-llm/data/output/Video_Games_items.parquet
10:06:31 - Loaded 66,121 cleaned descriptions from /home/zihao/llm/semantic-ids-llm/data/output/Video_Games_descriptions_clean.csv
10:06:31 - Loaded 66,130 cleaned titles from /home/zihao/llm/semantic-ids-llm/data/output/Video_Games_titles_clean.csv
10:06:31 - Merged 66,118 items with cleaned descriptions and titles


In [7]:
# Clean up xml tags that remain
item_df = item_df.with_columns(
    pl.col("clean_description")
    .str.replace_all(r"</?[^>]+>", " ")  # Remove all tags
    .str.replace_all(r"\s+", " ")  # Collapse multiple spaces
    .str.strip_chars()  # Trim whitespace
    .alias("clean_description"),
    pl.col("clean_title")
    .str.replace_all(r"</?[^>]+>", " ")  # Remove all tags
    .str.replace_all(r"\s+", " ")  # Collapse multiple spaces
    .str.strip_chars()  # Trim whitespace
    .alias("clean_title"),
)

assert (
    len(
        item_df.filter(pl.col("clean_description").str.contains("clean_description")).select(
            ["description_text", "clean_description"]
        )
    )
    == 0
)
assert len(item_df.filter(pl.col("clean_title").str.contains("clean_title")).select(["title", "clean_title"])) == 0

In [8]:
item_df = item_df.rename(
    {
        "description_text": "original_description",
        "clean_description": "description_text",
        "title": "original_title",
        "clean_title": "title",
    }
)
item_df.head()

parent_asin,original_title,original_description,features_text,main_category,categories_text,store,average_rating,rating_number,price,item_context,description_text,title
str,str,str,str,str,str,str,str,i64,str,str,str,str
"""B00CX6FZCC""","""DeadPool [Online Game Code]""","""There are a few important thin…","""""","""Video Games""","""Video Games > PC > Games""","""ACTIVISION""","""3.2""",41,"""""","""Product: DeadPool [Online Game…","""Step into the role of a mercen…","""Deadpool Digital Game Code"""
"""B07XM5HSGK""","""HUYUN USB Mouse Cable Wire & F…","""Replacement for Razer DeathAdd…","""100% New High quality USB mous…","""Industrial & Scientific""","""Video Games > PC > Accessories…","""HUYUN""","""4.1""",24,"""9.99""","""Product: HUYUN USB Mouse Cable…","""This is a replacement mouse ca…","""Razer DeathAdder 3.5G 2013 Chr…"
"""B001EDF55W""","""RS3 - Racing Simulation Three""","""Buckle up and strap yourself i…","""Featuring 6 breathtaking racin…","""Video Games""","""Video Games > PC > Games""","""Ubisoft""","""5.0""",1,"""7.42""","""Product: RS3 - Racing Simulati…","""Experience the most pulse-poun…","""RS3 Racing Simulation Three"""
"""B004H0LQO8""","""Fishdom 5 - Game Pack - PC""","""We have included everything in…","""Build your own aquarium""","""Video Games""","""Video Games > PC > Games""","""21 Rocks""","""3.6""",96,"""""","""Product: Fishdom 5 - Game Pack…","""This collection includes all f…","""Fishdom 5 PC Game"""
"""B07HRWKLXX""","""Obeka Compatible with Soft Ant…","""Our soft case cover will add a…","""GOOD COMPATIBILITY - Our silic…","""Computers""","""Video Games > Xbox One > Acces…","""Obeka""","""4.1""",48,"""""","""Product: Obeka Compatible with…","""Enhance your Xbox One Elite Co…","""Obeka Silicone Cover Case with…"


In [9]:
item_df.head()

parent_asin,original_title,original_description,features_text,main_category,categories_text,store,average_rating,rating_number,price,item_context,description_text,title
str,str,str,str,str,str,str,str,i64,str,str,str,str
"""B00CX6FZCC""","""DeadPool [Online Game Code]""","""There are a few important thin…","""""","""Video Games""","""Video Games > PC > Games""","""ACTIVISION""","""3.2""",41,"""""","""Product: DeadPool [Online Game…","""Step into the role of a mercen…","""Deadpool Digital Game Code"""
"""B07XM5HSGK""","""HUYUN USB Mouse Cable Wire & F…","""Replacement for Razer DeathAdd…","""100% New High quality USB mous…","""Industrial & Scientific""","""Video Games > PC > Accessories…","""HUYUN""","""4.1""",24,"""9.99""","""Product: HUYUN USB Mouse Cable…","""This is a replacement mouse ca…","""Razer DeathAdder 3.5G 2013 Chr…"
"""B001EDF55W""","""RS3 - Racing Simulation Three""","""Buckle up and strap yourself i…","""Featuring 6 breathtaking racin…","""Video Games""","""Video Games > PC > Games""","""Ubisoft""","""5.0""",1,"""7.42""","""Product: RS3 - Racing Simulati…","""Experience the most pulse-poun…","""RS3 Racing Simulation Three"""
"""B004H0LQO8""","""Fishdom 5 - Game Pack - PC""","""We have included everything in…","""Build your own aquarium""","""Video Games""","""Video Games > PC > Games""","""21 Rocks""","""3.6""",96,"""""","""Product: Fishdom 5 - Game Pack…","""This collection includes all f…","""Fishdom 5 PC Game"""
"""B07HRWKLXX""","""Obeka Compatible with Soft Ant…","""Our soft case cover will add a…","""GOOD COMPATIBILITY - Our silic…","""Computers""","""Video Games > Xbox One > Acces…","""Obeka""","""4.1""",48,"""""","""Product: Obeka Compatible with…","""Enhance your Xbox One Elite Co…","""Obeka Silicone Cover Case with…"


In [10]:
item_df = item_df.with_columns(
    pl.concat_str(
        [
            pl.lit("Title: "),
            pl.col("title"),
            pl.lit("\nDescription: "),
            pl.col("description_text"),
            pl.lit("\nCategory: "),
            pl.col("categories_text"),
            pl.lit("\nFeatures: "),
            pl.col("features_text"),
        ]
    ).alias("item_context")
)

In [19]:
contexts = item_df.select("item_context").head(2).to_series()
for c in contexts:
    print(c)
    print("------")

Title: Deadpool Digital Game Code
Description: Step into the role of a mercenary with an accelerated healing factor in this third-person action-shooter. Battle for the safety of humans and mutants, wielding a devastating arsenal that includes katanas, guns, and explosives. String together satisfying combos and eviscerate your enemies with your impressive combat skills. Your X-Men pals also make an appearance, adding to the excitement. **System Requirements:** * **OS:** Windows XP, Vista, 7, or 8 * **Processor:** Intel Core 2 Duo E8200 @ 2.66 GHz or AMD Phenom X3 8750 * **RAM:** 2GB * **Hard Drive:** 6.6 GB Free Space * **Video Card:** GeForce 8800 GT or ATI Radeon HD4850 (512MB RAM) * **Additional:** DirectX 9.0c or later Requires Steam Client to activate.
Category: Video Games > PC > Games
Features: 
------
Title: Razer DeathAdder 3.5G 2013 Chroma Overwatch Mouse USB Cable Wire & Mouse Feet Replacement
Description: This is a replacement mouse cable and mouse feet set designed for spec

# Initialize Gemini client

In [14]:
client = genai.Client()
logger.info("Gemini client initialized successfully")

10:13:21 - Gemini client initialized successfully


In [15]:
try:
    test_response = client.models.generate_content(
        model=MODEL, contents="Say 'API connection successful' if you can read this."
    )
    logger.info(f"API Test: {test_response.text}")
except Exception as e:
    logger.error(f"Failed to initialize Gemini client: {e}")
    logger.error("Please ensure GEMINI_API_KEY environment variable is set")
    raise

10:13:26 - API Test: API connection successful


## 4. Define Metadata Extraction Functions

In [20]:
PROMPT = """
Extract structured metadata from the following video game product information. 
Analyze the title, description, category, and features to identify key attributes.

Product Information:
<product>
{item}
</product>

Return ONLY a JSON object with the following fields (use null for unknown/not applicable):
{{
  "product_type": "Game|Hardware|Accessory|DLC|Bundle|<other relevant type>|null",
  "platform": ["PS5", "PS4", "Xbox Series X", "Xbox One", "Nintendo Switch", "PC", "Steam Deck", <other platforms>] or null,
  "genre": ["Action", "RPG", "Strategy", "Sports", "Racing", "Roguelike", "Soulslike", "Metroidvania", "Battle Royale", "MOBA", "Auto Battler", "Gacha", "Idle", "Tower Defense", <any other relevant genres/subgenres>] or null for non-games,
  "hardware_type": "Console|Controller|Headset|Cable|Storage|Skin|Stand|Cooling|Capture Card|<other hardware types>|null" for non-hardware,
  "brand": "Primary manufacturer or brand name" or null,
  "multiplayer": "Single-player|Local Co-op|Online Multiplayer|MMO|MMORPG|PvP|PvE|Couch Co-op|<other multiplayer modes>|null",
  "franchise": "Game series or franchise name if applicable" or null,
}}

Guidelines:
- Use arrays for fields that can have multiple values (especially for genre and platform)
- Keep values concise but descriptive
- For platform, use common abbreviations (PS5, Xbox One, Switch, etc.)
- For genres, be SPECIFIC and COMPREHENSIVE:
  * Include main genres AND subgenres (e.g., ["RPG", "JRPG", "Turn-based"])
  * Use modern gaming terminology (Roguelike, Soulslike, Metroidvania, Battle Royale, etc.)
  * Include thematic descriptors when relevant (Open World, Sandbox, Survival, Horror, etc.)
  * Can combine multiple descriptors (e.g., "Open World MMORPG", "Tactical Turn-based RPG")
- Feel free to use gaming-specific terms that accurately describe the product
- The examples shown are NOT exhaustive - use whatever terms best describe the product
- Return valid JSON only
""".strip()

In [21]:
# EXTRACT METADATA
def extract_metadata(prompt: str, item: str, model: str = MODEL) -> dict:
    """
    Extract metadata from item context using Gemini API.

    Args:
        prompt: The extraction instructions prompt
        item: The product context to analyze
        model: The Gemini model to use (defaults to MODEL constant)

    Returns:
        Dictionary of extracted metadata or None if failed
    """
    attempt = 0
    while attempt < MAX_RETRIES:
        try:
            # Format the full prompt with the item context
            full_prompt = prompt.format(item=item)

            # Generate content using Gemini
            response = client.models.generate_content(model=model, contents=full_prompt)

            # Parse JSON response
            response_text = response.text.strip()

            # Clean up markdown code blocks if present
            if response_text.startswith("```json"):
                response_text = response_text[7:]
            if response_text.startswith("```"):
                response_text = response_text[3:]
            if response_text.endswith("```"):
                response_text = response_text[:-3]

            # Parse the JSON
            metadata = json.loads(response_text.strip())

            # Validate that we got a dictionary with expected structure
            if isinstance(metadata, dict):
                return metadata
            else:
                attempt += 1
                if attempt < MAX_RETRIES:
                    logger.warning(f"Invalid response structure, retrying... (attempt {attempt}/{MAX_RETRIES})")
                    time.sleep(RETRY_DELAY)
                continue

        except json.JSONDecodeError as e:
            attempt += 1
            if attempt < MAX_RETRIES:
                logger.warning(f"JSON parse error: {e}, retrying... (attempt {attempt}/{MAX_RETRIES})")
                time.sleep(RETRY_DELAY)
            else:
                logger.error(f"Failed to parse JSON after {MAX_RETRIES} attempts")
                return None
        except Exception as e:
            attempt += 1
            if attempt < MAX_RETRIES:
                logger.warning(f"Error extracting metadata: {e}, retrying... (attempt {attempt}/{MAX_RETRIES})")
                time.sleep(RETRY_DELAY)
            else:
                logger.error(f"Failed after {MAX_RETRIES} attempts: {e}")
                return None

    return None


# Test the function with a sample item
test_item = item_df.head(1).select("item_context").item()
metadata = extract_metadata(PROMPT, test_item)
logger.info(f"Sample item context (first 200 chars): {test_item[:200]}...")
logger.info(f"Extracted metadata: {json.dumps(metadata, indent=2)}")

10:17:18 - Sample item context (first 200 chars): Title: Deadpool Digital Game Code
Description: Step into the role of a mercenary with an accelerated healing factor in this third-person action-shooter. Battle for the safety of humans and mutants, wi...
10:17:18 - Extracted metadata: {
  "product_type": "Game",
  "platform": [
    "PC"
  ],
  "genre": [
    "Action",
    "Third-person Shooter",
    "Action-Adventure"
  ],
  "hardware_type": null,
  "brand": null,
  "multiplayer": "Single-player",
  "franchise": "Deadpool"
}


## 5. Parallel Metadata Extraction with Checkpointing

In [26]:
# Setup checkpoint file (using JSONL for structured data)
checkpoint_file = DATA_DIR / "output" / f"{CATEGORY}_metadata.jsonl"

# Load existing processed ASINs if checkpoint exists
processed_asins = set()
if checkpoint_file.exists():
    try:
        with open(checkpoint_file, "r", encoding="utf-8") as f:
            for line in f:
                if line.strip():
                    data = json.loads(line)
                    processed_asins.add(data["parent_asin"])
        logger.info(f"Found {len(processed_asins):,} already processed items in checkpoint file")
    except Exception as e:
        logger.error(f"Error loading checkpoint: {e}")
        processed_asins = set()
else:
    # Create new empty JSONL file
    checkpoint_file.touch()
    logger.info(f"Created new checkpoint file: {checkpoint_file}")

# Filter to only unprocessed items
items_to_process = item_df.filter(~pl.col("parent_asin").is_in(processed_asins))
logger.info(f"Items to process: {len(items_to_process):,} out of {len(item_df):,} total items")

# Thread-safe file writing lock
file_lock = threading.Lock()


def process_single_item(item_data):
    """Process a single item and extract metadata."""
    asin, item_context = item_data

    try:
        # Extract metadata
        metadata = extract_metadata(PROMPT, item_context)

        if metadata:
            # Create result with ASIN
            result = {"parent_asin": asin, **metadata}

            # Write immediately to JSONL with lock
            with file_lock:
                with open(checkpoint_file, "a", encoding="utf-8") as f:
                    f.write(json.dumps(result) + "\n")

            # Rate limiting
            time.sleep(RATE_LIMIT_DELAY)
            return asin, metadata, True
        else:
            # Log failed extraction
            logger.warning(f"Failed to extract metadata for {asin}")

            # Write null result to maintain record
            result = {"parent_asin": asin, "extraction_failed": True}
            with file_lock:
                with open(checkpoint_file, "a", encoding="utf-8") as f:
                    f.write(json.dumps(result) + "\n")

            return asin, None, False

    except Exception as e:
        logger.error(f"Failed to process {asin}: {e}")
        # Write error record
        result = {"parent_asin": asin, "error": str(e)}
        with file_lock:
            with open(checkpoint_file, "a", encoding="utf-8") as f:
                f.write(json.dumps(result) + "\n")
        return asin, None, False


# Prepare items for processing
items = [(row["parent_asin"], row["item_context"]) for row in items_to_process.iter_rows(named=True)]

if len(items) > 0:
    logger.info(f"Starting parallel processing with {MAX_THREADS} workers...")

    # Process with ThreadPoolExecutor
    successful = 0
    failed = 0

    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        # Submit all tasks
        futures = {executor.submit(process_single_item, item): item for item in items}

        # Process completions with progress bar
        with tqdm(total=len(futures), desc="Extracting metadata") as pbar:
            for future in as_completed(futures):
                try:
                    asin, result, success = future.result()
                    if success:
                        successful += 1
                    else:
                        failed += 1
                    pbar.update(1)
                    pbar.set_postfix({"success": successful, "failed": failed})

                except Exception as e:
                    failed += 1
                    pbar.update(1)
                    pbar.set_postfix({"success": successful, "failed": failed})
                    logger.error(f"Future failed: {e}")

    logger.info(f"Processing complete! Successful: {successful:,}, Failed: {failed:,}")

else:
    logger.info("No items to process - all items already in checkpoint file!")

11:44:28 - Found 66,118 already processed items in checkpoint file
11:44:28 - Items to process: 0 out of 66,118 total items
11:44:28 - No items to process - all items already in checkpoint file!


In [27]:
# Load and verify the extracted metadata
if checkpoint_file.exists():
    # Load JSONL file into a list of dictionaries
    metadata_records = []
    with open(checkpoint_file, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                try:
                    metadata_records.append(json.loads(line))
                except json.JSONDecodeError as e:
                    logger.warning(f"Skipping invalid JSON line: {e}")

    logger.info(f"Loaded {len(metadata_records):,} metadata records from {checkpoint_file}")

    # Display sample of extracted metadata
    logger.info("Sample of extracted metadata:")
    for i, record in enumerate(metadata_records[:3]):
        logger.info(f"\nItem {i + 1} (ASIN: {record.get('parent_asin', 'N/A')}):")
        for key, value in record.items():
            if key != "parent_asin":
                logger.info(f"  {key}: {value}")

    # Analyze metadata coverage without converting to DataFrame first
    logger.info("\nMetadata field coverage:")
    field_counts = {}
    for record in metadata_records:
        for key, value in record.items():
            if key not in ["parent_asin", "extraction_failed", "error"]:
                if key not in field_counts:
                    field_counts[key] = 0
                if value is not None:
                    field_counts[key] += 1

    for field, count in sorted(field_counts.items()):
        coverage = (count / len(metadata_records)) * 100
        logger.info(f"  {field}: {count:,} / {len(metadata_records):,} ({coverage:.1f}%)")

    # Analyze product types
    logger.info("\nProduct type distribution:")
    product_type_counts = {}
    for record in metadata_records:
        pt = record.get("product_type")
        if pt:
            product_type_counts[pt] = product_type_counts.get(pt, 0) + 1

    for pt, count in sorted(product_type_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
        logger.info(f"  {pt}: {count:,}")

    # Analyze platforms (handling arrays)
    logger.info("\nPlatform distribution (flattened):")
    platform_counts = {}
    for record in metadata_records:
        platforms = record.get("platform")
        if platforms:
            if isinstance(platforms, list):
                for p in platforms:
                    platform_counts[p] = platform_counts.get(p, 0) + 1
            else:
                platform_counts[platforms] = platform_counts.get(platforms, 0) + 1

    for platform, count in sorted(platform_counts.items(), key=lambda x: x[1], reverse=True)[:15]:
        logger.info(f"  {platform}: {count:,}")

    # Analyze genres (handling arrays)
    logger.info("\nGenre distribution (flattened):")
    genre_counts = {}
    for record in metadata_records:
        genres = record.get("genre")
        if genres:
            if isinstance(genres, list):
                for g in genres:
                    genre_counts[g] = genre_counts.get(g, 0) + 1
            else:
                genre_counts[genres] = genre_counts.get(genres, 0) + 1

    for genre, count in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:20]:
        logger.info(f"  {genre}: {count:,}")

    # Show multiplayer modes distribution
    logger.info("\nMultiplayer mode distribution:")
    multiplayer_counts = {}
    for record in metadata_records:
        mp = record.get("multiplayer")
        if mp:
            if isinstance(mp, list):
                for m in mp:
                    multiplayer_counts[m] = multiplayer_counts.get(m, 0) + 1
            else:
                multiplayer_counts[mp] = multiplayer_counts.get(mp, 0) + 1

    for mp_mode, count in sorted(multiplayer_counts.items(), key=lambda x: x[1], reverse=True)[:10]:
        logger.info(f"  {mp_mode}: {count:,}")

else:
    logger.warning("No checkpoint file found. Run the processing cell first.")

11:44:39 - Loaded 66,118 metadata records from /home/zihao/llm/semantic-ids-llm/data/output/Video_Games_metadata.jsonl
11:44:39 - Sample of extracted metadata:
11:44:39 - 
Item 1 (ASIN: B001EDF55W):
11:44:39 -   product_type: Game
11:44:39 -   platform: ['PC']
11:44:39 -   genre: ['Simulation', 'Racing', 'Sports']
11:44:39 -   hardware_type: None
11:44:39 -   brand: None
11:44:39 -   multiplayer: Single-player
11:44:39 -   franchise: None
11:44:39 - 
Item 2 (ASIN: B08S7MCMM2):
11:44:39 -   product_type: Accessory
11:44:39 -   platform: ['PS5']
11:44:39 -   genre: None
11:44:39 -   hardware_type: Controller
11:44:39 -   brand: OLOPE
11:44:39 -   multiplayer: None
11:44:39 -   franchise: None
11:44:39 - 
Item 3 (ASIN: B001ELJP4G):
11:44:39 -   product_type: Accessory
11:44:39 -   platform: ['Wii']
11:44:39 -   genre: ['Racing']
11:44:39 -   hardware_type: Controller
11:44:39 -   brand: dreamGEAR
11:44:39 -   multiplayer: None
11:44:39 -   franchise: None
11:44:39 - 
Metadata field covera

## 6. Save Metadata as DataFrame

In [34]:
# Convert metadata to DataFrame format - handling mixed types properly
if checkpoint_file.exists() and len(metadata_records) > 0:
    # Filter out error records
    clean_records = [
        record for record in metadata_records if not record.get("extraction_failed") and not record.get("error")
    ]

    # Define which fields should be lists
    list_fields = ["platform", "genre", "compatibility"]
    EXPECTED_FIELDS = {"product_type", "platform", "genre", "hardware_type", "brand", "multiplayer", "franchise"}
    # Normalize records to ensure consistent types
    normalized_records = []
    for record in clean_records:
        filtered_record = {"parent_asin": record.get("parent_asin")}
        for key in EXPECTED_FIELDS:
            if key in record:
                filtered_record[key] = record[key]
        normalized = {}
        for key, value in filtered_record.items():
            if key in list_fields:
                # Ensure list fields are always lists
                if value is None:
                    normalized[key] = None
                elif isinstance(value, list):
                    normalized[key] = value
                else:
                    normalized[key] = [value]  # Convert single values to list
            else:
                # For non-list fields, ensure they're not lists
                if isinstance(value, list):
                    # If a non-list field has a list value, join it
                    try:
                        normalized[key] = ", ".join(value) if value else None
                    except Exception as e:
                        logger.warning(f"Error joining list for key: {key}, value: {value}")
                        normalized[key] = None
                else:
                    normalized[key] = value
        normalized_records.append(normalized)

    # Create DataFrame with normalized records
    metadata_df = pl.DataFrame(normalized_records)

    # Save as Parquet file (Parquet natively supports nested data types like lists)
    metadata_parquet_path = DATA_DIR / "output" / f"{CATEGORY}_metadata_extracted.parquet"
    metadata_df.write_parquet(metadata_parquet_path)
    logger.info(f"Saved metadata DataFrame with {len(metadata_df):,} records to {metadata_parquet_path}")

    # Display DataFrame info
    logger.info(f"\nDataFrame shape: {metadata_df.shape}")
    logger.info(f"Columns: {metadata_df.columns}")

    # Show sample of the DataFrame
    logger.info("\nSample of metadata DataFrame:")
    display(metadata_df.head(5))

    # Show data types (lists will show as List[Utf8])
    logger.info("\nColumn data types:")
    for col in metadata_df.columns:
        dtype = metadata_df[col].dtype
        null_count = metadata_df[col].null_count()
        non_null = len(metadata_df) - null_count
        logger.info(f"  {col}: {dtype} (non-null: {non_null}/{len(metadata_df)})")

    # Demonstrate how to work with array columns
    logger.info("\n--- Working with Array Columns ---")

    # Example 1: Explode genre array to see all genres
    if "genre" in metadata_df.columns:
        genres_exploded = (
            metadata_df.select(["parent_asin", "genre"]).filter(pl.col("genre").is_not_null()).explode("genre")
        )
        genre_counts = genres_exploded.group_by("genre").agg(pl.count().alias("count")).sort("count", descending=True)
        logger.info("\nExample 1 - Genre distribution (top 10):")
        display(genre_counts.head(10))

    # Example 2: Filter items by specific genre
    if "genre" in metadata_df.columns:
        # Find games with RPG genre
        rpg_games = metadata_df.filter(
            pl.col("genre").list.contains("RPG")
            | pl.col("genre").list.contains("JRPG")
            | pl.col("genre").list.contains("Action RPG")
        )
        logger.info(f"\nExample 2 - Games with RPG-related genres: {len(rpg_games)}")
        if len(rpg_games) > 0:
            display(rpg_games.select(["parent_asin", "product_type", "genre"]).head(3))

    # Example 3: Platform distribution
    if "platform" in metadata_df.columns:
        platforms_exploded = (
            metadata_df.select(["parent_asin", "platform"]).filter(pl.col("platform").is_not_null()).explode("platform")
        )
        platform_counts = (
            platforms_exploded.group_by("platform").agg(pl.count().alias("count")).sort("count", descending=True)
        )
        logger.info("\nExample 3 - Platform distribution:")
        display(platform_counts)

    # Example 4: Finding games by multiple criteria
    if "genre" in metadata_df.columns and "platform" in metadata_df.columns:
        # Find action games on PC
        action_pc_games = metadata_df.filter(
            (pl.col("product_type") == "Game")
            & (pl.col("platform").list.contains("PC"))
            & (pl.col("genre").list.contains("Action") | pl.col("genre").list.contains("Shooter"))
        )
        logger.info(f"\nExample 4 - Action/Shooter games on PC: {len(action_pc_games)}")
        if len(action_pc_games) > 0:
            display(action_pc_games.select(["parent_asin", "genre", "platform"]).head(3))

else:
    logger.warning("No metadata records to convert to DataFrame")

14:19:19 - Saved metadata DataFrame with 66,117 records to /home/zihao/llm/semantic-ids-llm/data/output/Video_Games_metadata_extracted.parquet
14:19:19 - 
DataFrame shape: (66117, 8)
14:19:19 - Columns: ['parent_asin', 'genre', 'brand', 'product_type', 'platform', 'hardware_type', 'franchise', 'multiplayer']
14:19:19 - 
Sample of metadata DataFrame:


parent_asin,genre,brand,product_type,platform,hardware_type,franchise,multiplayer
str,list[str],str,str,list[str],str,str,str
"""B001EDF55W""","[""Simulation"", ""Racing"", ""Sports""]",,"""Game""","[""PC""]",,,"""Single-player"""
"""B08S7MCMM2""",,"""OLOPE""","""Accessory""","[""PS5""]","""Controller""",,
"""B001ELJP4G""","[""Racing""]","""dreamGEAR""","""Accessory""","[""Wii""]","""Controller""",,
"""B0171WA496""",,"""GameXcel""","""Accessory""","[""Xbox One""]","""Skin""",,
"""B07HRWKLXX""",,"""Obeka""","""Accessory""","[""Xbox One""]","""Controller Skin""","""Xbox One Elite Controller""",


14:19:19 - 
Column data types:
14:19:19 -   parent_asin: String (non-null: 66117/66117)
14:19:19 -   genre: List(String) (non-null: 32970/66117)
14:19:19 -   brand: String (non-null: 51285/66117)
14:19:19 -   product_type: String (non-null: 66117/66117)
14:19:19 -   platform: List(String) (non-null: 64329/66117)
14:19:19 -   hardware_type: String (non-null: 34145/66117)
14:19:19 -   franchise: String (non-null: 35863/66117)
14:19:19 -   multiplayer: String (non-null: 30613/66117)
14:19:19 - 
--- Working with Array Columns ---
14:19:19 - 
Example 1 - Genre distribution (top 10):


(Deprecated in version 0.20.5)
  genre_counts = genres_exploded.group_by("genre").agg(pl.count().alias("count")).sort("count", descending=True)


genre,count
str,u32
"""Action""",10970
"""Adventure""",6885
"""Simulation""",5624
"""Puzzle""",5398
"""Strategy""",4275
"""Sports""",3591
"""RPG""",3099
"""Racing""",2610
"""Open World""",2092
"""Platformer""",1998


14:19:19 - 
Example 2 - Games with RPG-related genres: 3785


parent_asin,product_type,genre
str,str,list[str]
"""B072282HLF""","""Game""","[""Fantasy"", ""Adventure"", … ""Open World""]"
"""B001EYURIQ""","""Game""","[""RPG"", ""Fantasy"", ""Action RPG""]"
"""B07T6V433D""","""Bundle""","[""Action"", ""Adventure"", … ""Mystery""]"


14:19:19 - 
Example 3 - Platform distribution:


(Deprecated in version 0.20.5)
  platforms_exploded.group_by("platform").agg(pl.count().alias("count")).sort("count", descending=True)


platform,count
str,u32
"""PC""",22242
"""PS4""",7950
"""Nintendo Switch""",6103
"""Xbox One""",5939
"""Xbox 360""",3886
…,…
"""UMD""",1
"""PC (Windows)""",1
"""GameCube (via adapter)""",1
"""Genesis Mini""",1


14:19:19 - 
Example 4 - Action/Shooter games on PC: 2110


parent_asin,genre,platform
str,list[str],list[str]
"""B00CX6FZCC""","[""Action"", ""Third-person Shooter""]","[""PC""]"
"""B000BU0FUY""","[""Action"", ""3D Adventure""]","[""PC""]"
"""B001K7HV18""","[""Platformer"", ""Action"", ""Arcade""]","[""PC""]"


In [35]:
metadata_df.filter(pl.col("multiplayer").is_not_null())

parent_asin,genre,brand,product_type,platform,hardware_type,franchise,multiplayer
str,list[str],str,str,list[str],str,str,str
"""B001EDF55W""","[""Simulation"", ""Racing"", ""Sports""]",,"""Game""","[""PC""]",,,"""Single-player"""
"""B00EPC3PY2""","[""Action"", ""Musou""]","""Bandai Namco""","""Game""","[""PS3""]",,"""One Piece""","""Single-player"""
"""B00CX6FZCC""","[""Action"", ""Third-person Shooter""]",,"""Game""","[""PC""]",,"""Deadpool""","""Single-player"""
"""B004H0LQO8""","[""Puzzle"", ""Simulation""]","""Fishdom""","""Bundle""","[""PC""]",,"""Fishdom""","""Single-player"""
"""B002BSA1HQ""","[""Rail Shooter"", ""Action""]","""Nerf""","""Bundle""","[""Wii""]","""Controller""","""N-Strike Elite""","""Local Co-op"""
…,…,…,…,…,…,…,…
"""B075YGQ393""","[""Action RPG"", ""Adventure"", ""Hack and Slash""]","""Falcom""","""Game""","[""PS Vita""]",,"""Ys""","""Single-player"""
"""B006QRNKOO""","[""Action-Adventure"", ""Action"", … ""Open World""]","""Nintendo""","""Game""","[""Wii""]",,"""The Legend of Zelda""","""Single-player"""
"""B00UW4BGLG""","[""Sports"", ""Baseball""]","""R.B.I. Baseball""","""Game""","[""PS4""]",,"""R.B.I. Baseball""","""Single-player, Local Co-op, On…"
"""B001J2S0WI""","[""RPG"", ""MMORPG"", ""Fantasy""]","""ArenaNet""","""Bundle""","[""PC""]",,"""Guild Wars""","""Online Multiplayer, MMORPG, Pv…"


In [36]:
metadata_df.filter(pl.col("multiplayer").is_not_null())

parent_asin,genre,brand,product_type,platform,hardware_type,franchise,multiplayer
str,list[str],str,str,list[str],str,str,str
"""B001EDF55W""","[""Simulation"", ""Racing"", ""Sports""]",,"""Game""","[""PC""]",,,"""Single-player"""
"""B00EPC3PY2""","[""Action"", ""Musou""]","""Bandai Namco""","""Game""","[""PS3""]",,"""One Piece""","""Single-player"""
"""B00CX6FZCC""","[""Action"", ""Third-person Shooter""]",,"""Game""","[""PC""]",,"""Deadpool""","""Single-player"""
"""B004H0LQO8""","[""Puzzle"", ""Simulation""]","""Fishdom""","""Bundle""","[""PC""]",,"""Fishdom""","""Single-player"""
"""B002BSA1HQ""","[""Rail Shooter"", ""Action""]","""Nerf""","""Bundle""","[""Wii""]","""Controller""","""N-Strike Elite""","""Local Co-op"""
…,…,…,…,…,…,…,…
"""B075YGQ393""","[""Action RPG"", ""Adventure"", ""Hack and Slash""]","""Falcom""","""Game""","[""PS Vita""]",,"""Ys""","""Single-player"""
"""B006QRNKOO""","[""Action-Adventure"", ""Action"", … ""Open World""]","""Nintendo""","""Game""","[""Wii""]",,"""The Legend of Zelda""","""Single-player"""
"""B00UW4BGLG""","[""Sports"", ""Baseball""]","""R.B.I. Baseball""","""Game""","[""PS4""]",,"""R.B.I. Baseball""","""Single-player, Local Co-op, On…"
"""B001J2S0WI""","[""RPG"", ""MMORPG"", ""Fantasy""]","""ArenaNet""","""Bundle""","[""PC""]",,"""Guild Wars""","""Online Multiplayer, MMORPG, Pv…"
