In [1]:
!pip install openai==1.93.0

Defaulting to user installation because normal site-packages is not writeable


In [2]:
from dotenv import load_dotenv
import os

# Load environment variables from the specified path
load_dotenv(dotenv_path='/home/sparky/.dbt/.env')

True

In [3]:
import polars as pl
from datetime import datetime
from openai import OpenAI
import pickle
import numpy as np
from tqdm import tqdm
import concurrent.futures
import time
import os
import json

## Read Raw Data

In [4]:
raw_path = '/opt/spark/work-dir/data/exists_data/raw_embedding_demo.csv'

raw_df = pl.read_csv(source=raw_path, has_header=True)
raw_df.head()

product_name,sale_price,ingest_timestamp_utc,ecommerce_name
str,f64,str,str
"""DermaSense Targeted Dark Spot …",947.0,"""2025-06-19 3:03:30""","""ecommerce_01"""
"""DermaSense pH Balance Gentle S…",351.0,"""2025-06-19 3:03:30""","""ecommerce_02"""
"""ActiveVita Youthful Skin Elixi…",369.0,"""2025-06-19 3:03:30""","""ecommerce_02"""
"""DermaSense pH Balance แชมพูอ่อ…",351.0,"""2025-06-19 3:03:30""","""ecommerce_01"""
"""DermaSense Luminous Exfoliatin…",1215.0,"""2025-06-19 3:03:30""","""ecommerce_01"""


In [5]:
em_01 = raw_df.filter(pl.col("ecommerce_name") == 'ecommerce_01')
em_02 = raw_df.filter(pl.col("ecommerce_name") == 'ecommerce_02')

## Setup & Helper Function

In [6]:
# Ensure OPENAI_API_KEY environment variable is set
client = OpenAI()

In [7]:
# Define function: generate text embeddings
def generate_embeddings(text):
    """
    Generate embeddings for input text using OpenAI's text-embedding-3-small model.

    Args:
        text (str or list[str]): The text or list of texts to embed.

    Returns:
        list or list[list]: A list of embeddings, or a list of lists of embeddings if input was a list.
    """
    if not text:
        return []
    
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    embeddings = [record.embedding for record in response.data]
    return embeddings if isinstance(text, list) else embeddings[0]

In [8]:
# Set embedding cache path
embedding_cache_path = "embeddings_cache.pkl"

# Try to load cached embeddings
try:
    with open(embedding_cache_path, 'rb') as f:
        embedding_cache = pickle.load(f)
except FileNotFoundError:
    embedding_cache = {}

In [9]:
# Define function: obtain text embeddings through caching mechanism
def embedding_from_string(string: str, embedding_cache=embedding_cache) -> list:
    """
    Get embedding for given text, using cache mechanism to avoid recomputation.

    Args:
        string (str): The input text to get the embedding for.
        embedding_cache (dict): A dictionary used as a cache for embeddings.

    Returns:
        list: The embedding vector for the input string.
    """
    if string not in embedding_cache.keys():
        embedding_cache[string] = generate_embeddings(string)
        with open(embedding_cache_path, "wb") as embedding_cache_file:
            pickle.dump(embedding_cache, embedding_cache_file)
    return embedding_cache[string]


## Embed base product name

In [10]:
# Define a dynamic batch size. You can adjust this value based on API limits and performance.
BATCH_SIZE = 60

print("Preparing base by generating embeddings for all base products...")
# Get all product names from the base dataframe
base_names = em_02.select('product_name').to_series().to_list()

# 1. Identify which product names are new and need to be embedded.
names_to_embed = [name for name in base_names if name not in embedding_cache]

Preparing base by generating embeddings for all base products...


In [11]:
# 2. If there are any new names, process them in manageable batches.
if names_to_embed:
    total_batches = -(-len(names_to_embed) // BATCH_SIZE)  # Ceiling division to calculate total batches
    print(f"Found {len(names_to_embed)} new products to embed. Processing in {total_batches} batches of up to {BATCH_SIZE} items each...")
    
    # Iterate through the new names in chunks of BATCH_SIZE
    for i in range(0, len(names_to_embed), BATCH_SIZE):
        current_batch_num = (i // BATCH_SIZE) + 1
        
        # Define the current batch of names
        batch_names = names_to_embed[i:i + BATCH_SIZE]
        print(f"  - Processing batch {current_batch_num}/{total_batches} ({len(batch_names)} items)...")
        
        # Generate embeddings for the current batch
        batch_embeddings = generate_embeddings(batch_names)
        
        # 3. Update the cache with the newly generated embeddings from this batch
        for name, embedding in zip(batch_names, batch_embeddings):
            embedding_cache[name] = embedding

        # 4. Save the updated cache to disk after each batch. This adds robustness.
        #    If the script fails, progress from completed batches is not lost.
        with open(embedding_cache_path, "wb") as embedding_cache_file:
            pickle.dump(embedding_cache, embedding_cache_file)
        print(f"  - Batch {current_batch_num} processed and cache saved.")
            
    print("All new embeddings have been generated and cached.")

# 5. Retrieve all embeddings from the cache to ensure the list is complete and in the correct order.
base_embeddings = [embedding_cache[name] for name in base_names]

In [21]:
data_for_df = {
    'product_name': base_names,
    'embedding': base_embeddings
}

# 2. Now, create the DataFrame from the dictionary
product_embeddings_df = pl.DataFrame(data_for_df)
product_embeddings_df.head()

product_name,embedding
str,list[f64]
"""DermaSense pH Balance Gentle S…","[0.047305, -0.000145, … -0.017072]"
"""ActiveVita Youthful Skin Elixi…","[0.0062, 0.015389, … -0.002123]"
"""DermaSense Acne Control Gel-to…","[0.005794, 0.001892, … -0.011013]"
"""ActiveVita Ocean Omega-3 Salmo…","[-0.009985, 0.024691, … 0.007499]"
"""DermaSense Aqua-Hydrate Soothi…","[0.054816, 0.011955, … 0.017542]"


## Find Best Match

In [22]:
# --- 1. MODIFIED CORE MATCHING LOGIC ---
def find_best_match_in_context(query_name, context_embeddings_np):
    """
    Finds the single most similar product in the context catalog for a given query name.

    Args:
        query_name (str): The name of the product to query.
        context_embeddings_np (numpy.ndarray): NumPy array of embeddings for products in the context.

    Returns:
        tuple: A tuple containing:
               - best_index (int): Index of the single most similar product in the context.
               - best_similarity (float): Cosine similarity score for the best match.
    """
    # Generate embedding for the query name
    query_embedding = embedding_from_string(query_name)

    # Calculate cosine similarity between the query embedding and all context embeddings
    similarities = np.dot(context_embeddings_np, query_embedding) / \
                   (np.linalg.norm(context_embeddings_np, axis=1) * np.linalg.norm(query_embedding))

    # --- CHANGE: Use argmax to find the single best match directly ---
    best_index = np.argmax(similarities)
    best_similarity = similarities[best_index]

    return best_index, best_similarity

In [23]:
# --- 2. MODIFIED BATCH API PROCESSING LOGIC ---
def prepare_batch_verification_file(product_chunk, context_df, context_embeddings_np, batch_file_path):
    """
    Prepares a JSONL file for the OpenAI Batch API with one-to-one verification requests.
    This version instructs the LLM to verify a single candidate and return a simple JSON object.
    """
    custom_id_to_product_map = {}
    with open(batch_file_path, 'w') as f:
        for i, row in enumerate(product_chunk):
            query_product_name = row.get('product_name')
            if not query_product_name:
                continue

            # --- CHANGE: Find only the single best candidate match ---
            match_index, _ = find_best_match_in_context(query_product_name, context_embeddings_np)
            matched_product_name = context_df.row(match_index, named=True)['product_name']

            custom_id = f"request_{os.path.basename(batch_file_path)}_{i}"
            custom_id_to_product_map[custom_id] = {
                'original_row': row,
                'candidate_match': matched_product_name,
                'match_index': match_index
            }

            # --- CHANGE: Updated prompt for direct one-to-one comparison ---
            prompt = f"""You are a precise product matching assistant. Your task is to determine if the 'Base Product' and the 'Candidate Product' are the same item.

Respond in a valid JSON format with a single key:
1. "match_found": a boolean (true or false).

Base Product:
'{query_product_name}'

Candidate Product:
'{matched_product_name}'
"""

            # Create the JSON object for the batch file
            json_request = {
                "custom_id": custom_id,
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [
                        {"role": "system", "content": "You are a helpful assistant that responds in JSON format."},
                        {"role": "user", "content": prompt}
                    ],
                    "max_tokens": 50,  # Reduced token need for simpler JSON
                    "temperature": 0.0,
                    "response_format": {"type": "json_object"}
                }
            }
            f.write(json.dumps(json_request) + '\n')
            
    return custom_id_to_product_map

In [24]:
def run_batch_job_and_get_results(batch_file_path):
    """Uploads a file, runs a batch job, waits for completion, and returns results."""
    if not os.path.exists(batch_file_path) or os.path.getsize(batch_file_path) == 0:
        print("Batch file is empty or does not exist. Skipping.")
        return None

    # 1. Upload the file
    print(f"Uploading batch file: {batch_file_path}")
    batch_input_file = client.files.create(file=open(batch_file_path, "rb"), purpose="batch")
    
    # 2. Create the batch job
    batch_job = client.batches.create(
        input_file_id=batch_input_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )
    print(f"Batch job created: {batch_job.id}. Waiting for completion...")

    # 3. Poll for completion
    while batch_job.status not in ["completed", "failed", "cancelled"]:
        time.sleep(30)
        batch_job = client.batches.retrieve(batch_job.id)
        print(f"  - Job status: {batch_job.status}")

    if batch_job.status != "completed":
        print(f"Batch job did not complete successfully. Status: {batch_job.status}")
        return None

    # 4. Retrieve and return results
    output_file_id = batch_job.output_file_id
    result_content = client.files.content(output_file_id).read()
    
    results = [json.loads(line) for line in result_content.decode('utf-8').strip().split('\n')]
    return results

In [25]:
# --- 3. MODIFIED MAIN PROCESSING SCRIPT ---

# Define the batch size
BATCH_SIZE = 100 # Can potentially be larger with a simpler process
# --- CHANGE: Reflecting user's variable names for clarity ---
# df_query corresponds to the new products to be checked.
# df_base corresponds to the existing product catalog (like 'product_embeddings_df').
df_query = em_01 
df_base = product_embeddings_df # Using the name from the user's second snippet
base_embeddings_np = np.array(df_base['embedding'].to_list())

In [26]:
results_list = []
query_rows = list(df_query.iter_rows(named=True))

print(f"Starting product matching in sequential batches of {BATCH_SIZE}...")

Starting product matching in sequential batches of 100...


In [27]:
results_list

[]

In [28]:
# Process the query dataframe in sequential chunks
for i in range(0, len(query_rows), BATCH_SIZE):
    product_chunk = query_rows[i:i + BATCH_SIZE]
    chunk_num = (i // BATCH_SIZE) + 1
    print(f"\n--- Processing Chunk {chunk_num} ({len(product_chunk)} products) ---")
    
    # 1. Prepare the batch file with the simplified one-to-one logic
    batch_file_path = f"batch_input_chunk_{chunk_num}.jsonl"
    product_map = prepare_batch_verification_file(product_chunk, df_base, base_embeddings_np, batch_file_path)

    # 2. Run the batch job
    batch_results = run_batch_job_and_get_results(batch_file_path)
    
    if not batch_results:
        print(f"Skipping result processing for Chunk {chunk_num} due to batch failure or empty input.")
        continue

    # 3. Process the results with simplified JSON parsing
    for result in batch_results:
        custom_id = result['custom_id']
        original_data = product_map.get(custom_id)
        
        if not original_data:
            continue
        
        try:
            llm_output_str = result['response']['body']['choices'][0]['message']['content']
            llm_response = json.loads(llm_output_str)
            is_match = llm_response.get('match_found', False)

        except (json.JSONDecodeError, KeyError, IndexError, TypeError) as e:
            print(f"Could not parse response for {custom_id}: {e}. Treating as no match.")
            is_match = False

        query_row = original_data['original_row']
        query_product_name = query_row['product_name']
        query_product_price = query_row['sale_price']
        
        # Get info for the single candidate match
        top_match_idx = original_data['match_index']
        top_match_info = df_base.row(top_match_idx, named=True)
            
        # Recalculate similarity for reporting
        query_embedding = embedding_from_string(query_product_name)
        match_embedding = base_embeddings_np[top_match_idx]
        similarity_score = np.dot(match_embedding, query_embedding) / (np.linalg.norm(match_embedding) * np.linalg.norm(query_embedding))

        results_list.append({
            'base_product_name (from line)': query_product_name,
            'base_product_name (from watson)': top_match_info['product_name'],
            'price_from_line': query_product_price,
            'price_from_watson': top_match_info['sale_price'],
            'similarity': float(similarity_score),
            'verified_match': is_match # Directly use the boolean from the LLM
        })

    os.remove(batch_file_path)

print("\n--- All chunks processed. Final results compiled. ---")


--- Processing Chunk 1 (100 products) ---
Uploading batch file: batch_input_chunk_1.jsonl
Batch job created: batch_685fafff1a308190901e6c2cec8c9125. Waiting for completion...
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job status: in_progress
  - Job stat

KeyError: 'sale_price'

In [None]:
# --- FINAL STEP: CONVERT RESULTS TO DATAFRAME AND SAVE TO CSV ---
if results_list:
    print("Converting results to a Polars DataFrame...")
    results_df = pl.DataFrame(results_list)

    output_filename = "product_matching_results_simplified.csv"
    try:
        results_df.write_csv(output_filename)
        print(f"Successfully saved {len(results_df)} results to '{output_filename}'")
    except Exception as e:
        print(f"Error saving results to CSV: {e}")
else:
    print("No results were generated to save.")