## The demo code to explore the dataset and create embeddings

In [None]:
# prepare your openai and pinecone api keys
from openai import OpenAI
from pinecone import Pinecone

### Some basic EDA

In [None]:
import pyarrow.parquet as pq
parquet_file = pq.ParquetFile('data/amazon_fashion_clean_051624.parquet') # modify the path to the parquet file
print(parquet_file.metadata)

<pyarrow._parquet.FileMetaData object at 0x000001999A03D800>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 562
  num_rows: 776976
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 129583


In [None]:
batch_size = 1000
iterator = parquet_file.iter_batches(batch_size=batch_size)

In [None]:
# load one batch

batch = next(iterator)
print(batch)

pyarrow.RecordBatch
title: string
average_rating: double
rating_number: int64
features: string
description: string
price: double
images: list<item: struct<hi_res: string, large: string, thumb: string, variant: string>>
  child 0, item: struct<hi_res: string, large: string, thumb: string, variant: string>
      child 0, hi_res: string
      child 1, large: string
      child 2, thumb: string
      child 3, variant: string
store: string
details: struct<: string, ABPA Partslink Number: string, ASTM Fluid Rating: string, Active Ingredients: string, Additional product features: string, Age Range (Description): string, Age Range Description: string, Alarm Clock: string, Antibacterial treatment: string, Arch Type: string, Are Batteries Included: string, Aspect Ratio: string, Assembled Depth: string, Assembled Height: string, Assembled Length: string, Assembled Width: string, Assembly Required: string, Assembly required: string, Auto Part Position: string, Auto Shutoff: string, Back Material T

In [None]:
# load batch into pandas dataframe
df = batch.to_pandas()
df.columns

Index(['title', 'average_rating', 'rating_number', 'features', 'description',
       'price', 'images', 'store', 'details', 'parent_asin',
       'title_review_agg', 'user_id', 'timestamp', 'avg_rating_reviewers',
       'coefvar_rating_reviewers', 'text_agg', 'text_weighted_agg',
       'images_review_cln'],
      dtype='object')

- 'title' and 'description would be used for text embedding.

- 'image' column need some processing to retrieve images before embed

In [None]:
df['images'][0]

array([{'hi_res': 'https://m.media-amazon.com/images/I/61hlxr+VAoL._AC_UL1000_.jpg', 'large': 'https://m.media-amazon.com/images/I/51nk4aYpnbL._AC_.jpg', 'thumb': 'https://m.media-amazon.com/images/I/51nk4aYpnbL._AC_US40_.jpg', 'variant': 'MAIN'},
       {'hi_res': 'https://m.media-amazon.com/images/I/61RwNTNuZOL._AC_UL1000_.jpg', 'large': 'https://m.media-amazon.com/images/I/41EW-AJGmEL._AC_.jpg', 'thumb': 'https://m.media-amazon.com/images/I/41EW-AJGmEL._AC_US40_.jpg', 'variant': 'PT01'},
       {'hi_res': 'https://m.media-amazon.com/images/I/51rWHbgwIiL._AC_UL1000_.jpg', 'large': 'https://m.media-amazon.com/images/I/415DEob2AnL._AC_.jpg', 'thumb': 'https://m.media-amazon.com/images/I/415DEob2AnL._AC_US40_.jpg', 'variant': 'PT02'},
       {'hi_res': 'https://m.media-amazon.com/images/I/51rWHbgwIiL._AC_UL1000_.jpg', 'large': 'https://m.media-amazon.com/images/I/415DEob2AnL._AC_.jpg', 'thumb': 'https://m.media-amazon.com/images/I/415DEob2AnL._AC_US40_.jpg', 'variant': 'PT03'},
       {

In [None]:
# an example to extract the first large image from the images column
df['first_large_image'] = df['images'].apply(lambda x: x.tolist()[0]['large'] if x.size > 0 else None)
df['first_large_image']

0      https://m.media-amazon.com/images/I/51nk4aYpnb...
1      https://m.media-amazon.com/images/I/51b-BnwE4r...
2      https://m.media-amazon.com/images/I/41nYxugn7f...
3      https://m.media-amazon.com/images/I/41NLo0H7B1...
4      https://m.media-amazon.com/images/I/31Epv0yCkL...
                             ...                        
995    https://m.media-amazon.com/images/I/41gVB2l1WP...
996    https://m.media-amazon.com/images/I/51b7SaP2DB...
997    https://m.media-amazon.com/images/I/31uPzIRRE6...
998    https://m.media-amazon.com/images/I/416BnKa-fL...
999    https://m.media-amazon.com/images/I/31+VOH41Hc...
Name: first_large_image, Length: 1000, dtype: object

## Text embedding with "text-embedding-3-large"

### Try if OpenAI api work for you

In [None]:
from openai import OpenAI
client = OpenAI() # put your openai api key inside the parenthesis

In [None]:
EMBEDDING_MODEL = "text-embedding-3-large"

In [None]:
def create_embedding(text):
    response = client.embeddings.create(
        model=EMBEDDING_MODEL,
        input=text,
    )
    return response.data[0].embedding

In [None]:
test_embed = create_embedding("Hello, world!")
print(len(test_embed))

3072


## Full implementation code here:

In [None]:
pinecone_api_key = "your_pinecone_api_key_here"
pc = Pinecone(api_key=pinecone_api_key)
index = pc.Index("your_index_name_here")

In [None]:
import pyarrow.parquet as pq
from openai import OpenAI
import concurrent.futures
import pandas as pd
import logging
import time

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

client = OpenAI() # your openai api key here
EMBEDDING_MODEL = "text-embedding-3-large"
pinecone_api_key = "your_pinecone_api_key_here" ##
pc = Pinecone(api_key=pinecone_api_key)
pinecone_index = pc.Index("your_index_name_here") ###


In [None]:
def fetch_existing_ids_from_pinecone(pinecone_index):

    existing_ids = set()
    for ids in pinecone_index.list():
        for id in ids:
            existing_ids.add(id)

    return existing_ids

def read_ids_from_dataset(parquet_file):
    dataset_ids = set()
    for batch in parquet_file.iter_batches(columns=['parent_asin'], batch_size=5000):
        df = batch.to_pandas()
        dataset_ids.update(df['parent_asin'])
    return dataset_ids


In [None]:
def read_ids_from_dataset(parquet_file):
    dataset_ids = set()
    for batch in parquet_file.iter_batches(columns=['parent_asin'], batch_size=5000):
        df = batch.to_pandas()
        dataset_ids.update(df['parent_asin'])
    return dataset_ids

dataset_ids = read_ids_from_dataset(parquet_file)
missing_ids = dataset_ids.difference(existing_id) # incase you have already indexed some of the ids

# Set of target product IDs
target_product_ids = missing_ids  # Fill with dataset_ids if you want to index all products

In [None]:
# Function to create embedding using OpenAI
def create_embedding(text):
    try:
        response = client.embeddings.create(
            model=EMBEDDING_MODEL,
            input=text,
        )
        return response.data[0].embedding
    except Exception as e:
        logging.error(f"Failed to create embedding: {e}")
        return None

# Load the Parquet file 
parquet_file = pq.ParquetFile('data/amazon_fashion_clean_051624.parquet') #### You can change the path to the parquet file

# Define the number of rows per batch
batch_size = 1000

# Batch size for Pinecone upserts
pinecone_batch_size = 200 # use 200 since the data is large
upsert_buffer = []

def process_row(row):
    parent_asin = row.get('parent_asin', 'unknown_id')
    if parent_asin in target_product_ids:
        title = row.get('title', '')
        description = row.get('description', '')
        store = row.get('store', 'N/A')
        if store is None:
            store = 'N/A'
        average_rating = row.get('average_rating', 0.0)
        text = f"{title}: {description}"
        # Truncate text to 300 tokens
        tokens = text.split()[:300]
        truncated_text = " ".join(tokens)

        # Create embedding
        embedding = create_embedding(truncated_text)
        if embedding is None:
            return None

        return {
            "id": parent_asin,
            "values": embedding,
            "metadata": {
                "title": title,
                "store": store,
                "average_rating": average_rating
            }
        }

def upsert_to_pinecone(batch):
    retry_attempts = 3
    for attempt in range(retry_attempts):
        try:
            if batch:
                pinecone_index.upsert(vectors=batch)
                logging.info("Upsert batch successful")
                break
        except Exception as e:
            logging.error(f"Upsert failed: {e}, retrying ({attempt+1}/{retry_attempts})")
            time.sleep(2 ** attempt)  # Exponential backoff


In [None]:
# Create a ThreadPoolExecutor for parallel processing
executor = concurrent.futures.ThreadPoolExecutor(max_workers=200)

# Iterator to process the parquet file in batches
for batch in parquet_file.iter_batches(batch_size=batch_size):
    df = pd.DataFrame(batch.to_pandas())
    futures = [executor.submit(process_row, row) for index, row in df.iterrows()]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

    for result in results:
        if result is not None:
            upsert_buffer.append(result)

            # Check if buffer reached Pinecone batch size
            if len(upsert_buffer) >= pinecone_batch_size:
                upsert_to_pinecone(upsert_buffer)
                upsert_buffer = []

# Upsert any remaining items in the buffer
upsert_to_pinecone(upsert_buffer)

### Image embedding

- This is a little more complicated as this demo code is running locally using torch cuda environment. 

- Prepare the `sentence_transformers` pacakge and torch based on your own computer or working environment

In [None]:
import ijson
import json
import numpy as np
import cv2 ### ! pip install opencv-python-headless
import urllib.request
from PIL import Image
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import torch
from pinecone import Pinecone


In [None]:
torch.__version__

'2.3.0'

In [None]:
torch.cuda.get_device_name(torch.cuda.current_device()) # check if you have a gpu

'NVIDIA GeForce RTX 4090'

In [None]:
model = SentenceTransformer('clip-ViT-B-32')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

SentenceTransformer(
  (0): CLIPModel()
)

### First, we can create a list of urls from the parquet file, store in a json so if can be processed cleanly later

In [None]:
import pyarrow.parquet as pq
import pandas as pd
import json

# Path to your Parquet file
parquet_file_path = 'data/amazon_fashion_clean_051624.parquet'
parquet_file = pq.ParquetFile(parquet_file_path)

# Batch size for reading
batch_size = 20000

# Dictionary to hold all product details
product_details = {}

# Iterator to go through the Parquet file in batches
iterator = parquet_file.iter_batches(batch_size=batch_size)

batch_count = 0

for batch in iterator:
    # Convert the current batch to a pandas DataFrame
    df = batch.to_pandas()
    
    # Extract the first large image link
    df['first_large_image'] = df['images'].apply(lambda x: x.tolist()[0]['large'] if x.size > 0 else None)
    
    # Filter the columns needed for the final output
    df_final = df[['parent_asin', 'title', 'first_large_image']]
    
    # Update the dictionary with the data from this batch
    for _, row in df_final.iterrows():
        asin = row['parent_asin']
        product_details[asin] = {'title': row['title'], 'image_link': row['first_large_image']}

    batch_count += 1

    print(f'Processed batch {batch_count}')

products_list = []
for asin, details in product_details.items():
    products_list.append({
        "id": asin,
        "title": details['title'],
        "image_link": details['image_link']
    })

# Path where you want to save the JSON file
json_file_path = 'output/product_details_array.json' ## modify your file name

# Write the dictionary to a JSON file
with open(json_file_path, 'w') as json_file:
    json.dump(products_list, json_file, indent=4)

print(f'Product details have been successfully written to {json_file_path}')

This output file would be passed into the process below to create clip embedding and upload to pinecone

In [None]:
def fetch_images(urls):
    images = []
    for url in tqdm(urls):
        try:
            resp = urllib.request.urlopen(url)
            image = np.asarray(bytearray(resp.read()), dtype="uint8")
            image = cv2.imdecode(image, cv2.IMREAD_COLOR)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            images.append(Image.fromarray(image))
        except Exception as e:
            print(f"Failed to load {url}: {str(e)}")
    return images

def upsert_to_pinecone(pinecone_index, batch, retries=3):
    attempt = 0
    while attempt < retries:
        try:
            pinecone_index.upsert(vectors=batch)
            return True
        except Exception as e:
            attempt += 1
            print(f"Upsert failed on attempt {attempt}: {str(e)}")
            time.sleep(2 ** attempt)
    return False

def process_images_in_batches(json_file, batch_size=10, progress_file='progress_record.json'):
    urls = []
    keys = []
    titles = []
    count = 0
    processed_keys = set()

    # Load progress if exists
    try:
        with open(progress_file, 'r') as file:
            processed_keys = set(json.load(file))
    except FileNotFoundError:
        print("No progress file found. Starting fresh.")

    with open(json_file, 'rb') as file:
        items = ijson.items(file, 'item')
        for item in items:
            if item['id'] not in processed_keys:
                urls.append(item['image_link'])
                keys.append(item['id'])
                titles.append(item['title'])
                count += 1

            if count >= batch_size:
                images = fetch_images(urls)
                embeddings = model.encode(images, batch_size=len(images), convert_to_tensor=True, show_progress_bar=True)
                batch = [
                    {"id": key, "values": embedding.cpu().numpy().tolist(), "metadata": {"title": title}}
                    for key, embedding, title in zip(keys, embeddings, titles)
                ]
                if upsert_to_pinecone(pinecone_index, batch):
                    processed_keys.update(keys)
                    with open(progress_file, 'w') as f:
                        json.dump(list(processed_keys), f)
                
                urls = []
                keys = []
                titles = []
                count = 0

        if urls:
            images = fetch_images(urls)
            embeddings = model.encode(images, batch_size=len(images), convert_to_tensor=True, show_progress_bar=True)
            batch = [
                {"id": key, "values": embedding.cpu().numpy().tolist(), "metadata": {"title": title}}
                for key, embedding, title in zip(keys, embeddings, titles)
            ]
            if upsert_to_pinecone(pinecone_index, batch):
                processed_keys.update(keys)
                with open(progress_file, 'w') as f:
                    json.dump(list(processed_keys), f)
            print(f"Upserted final batch of size {len(images)}")

In [None]:
process_images_in_batches('output/product_details_array.json', batch_size=128)