[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/pinecone-field/ecommerce-product-search/blob/main/data-pipeline/product-search-genai.ipynb)

# Install dependencies 
Use the following shell command to install the pinecone client and llama-index for data ingestion. This notebook uses:

1. pinecone-client - for vector db upserts and queries
2. python-dotenv - for setting environment variables for openai and pinecone
3. llama-index and llama-hub - data pipeline framework that ingests a 150K record csv into pinecone. Also provides query wrappers for pinecone and openai.

In [None]:
!pip install -U "pinecone-client[grpc]" "python-dotenv" "llama-index" "llama-hub" "cohere"
!pip freeze > requirements.txt

# Download the e-commerce product data set

In [None]:
import requests

url = "https://github.com/pinecone-field/ecommerce-product-search/blob/main/data-pipeline/icecat_products.csv?raw=true"
response = requests.get(url)
with open("icecat_products.csv", "wb") as file:
  file.write(response.content)
print("downloaded product dataset")

# Create Pinecone index 

In [12]:
from dotenv import load_dotenv
import os
import pinecone

loaded = load_dotenv('.env')

if loaded:
    print(".env file was successfully loaded")
    PINECONE_INDEX_NAME = os.environ['PINECONE_INDEX_NAME']
    PINECONE_API_KEY = os.environ['PINECONE_API_KEY']
    PINECONE_ENVIRONMENT = os.environ['PINECONE_ENVIRONMENT']
    OPENAI_API_KEY = os.environ['OPENAI_API_KEY']
else:
    print(".env file was not found or could not be loaded")
    PINECONE_INDEX_NAME = input("Enter pinecone index name: ")
    PINECONE_API_KEY = input("Enter pinecone api key: ")
    PINECONE_ENVIRONMENT = input("Enter pinecone environment: ")
    OPENAI_API_KEY = input("Enter openai api key: ")

METRIC = "cosine"
DIMENSIONS = 1536

pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)

if (PINECONE_INDEX_NAME in pinecone.list_indexes()) != True:  
    pinecone.create_index(PINECONE_INDEX_NAME, dimension=DIMENSIONS, metric=METRIC, pods=2, replicas=1, pod_type="p1.x1")
else:
    print(f"Index {PINECONE_INDEX_NAME} already exists")

print(f"Index Description: {pinecone.describe_index(name=PINECONE_INDEX_NAME)}")

pinecone_index = pinecone.GRPCIndex(index_name=PINECONE_INDEX_NAME)

.env file was successfully loaded


KeyboardInterrupt: 

# Parse CSV into llama_index 

1. Load csv file
2. Attach metadata that can be used in pinecone queries
3. End result is a list of `nodes` that can be upserted into pinecone with a simple vector_store.add() call

In [None]:
from pathlib import Path
from llama_index.schema import TextNode
from llama_index.node_parser import SimpleNodeParser
from tqdm.auto import tqdm 
from datetime import datetime
from llama_index import download_loader

parser = SimpleNodeParser.from_defaults()

PagedCSVReader = download_loader("PagedCSVReader")

loader = PagedCSVReader(encoding="utf-8")
documents = loader.load_data(file=Path('./icecat_products.csv'))

def get_metadata(docText):
    data_dict = {}
    for line in docText.strip().split('\n'):
        if line.startswith('price') == True:
            try:
                key, value = line.split(': ', 1)
                data_dict[key] = float(value)
            except:
                pass
        elif line.startswith('date_released') == True:
            try:
                key, value = line.split(': ', 1)
                date_obj = datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
                new_date_obj = datetime(date_obj.year, date_obj.month, date_obj.day)
                unix_timestamp = int(new_date_obj.timestamp())
                data_dict[key] = unix_timestamp
            except:
                pass
        elif line.startswith('ean') == False:
            try:
                key, value = line.split(': ', 1)
                data_dict[key] = value
            except:
                pass
    return data_dict

nodes = []

for doc in tqdm(documents, total=len(documents)):
    node = TextNode(
        text=doc.text
    )
    
    node.metadata = get_metadata(doc.text)
    nodes.append(node)

print(nodes[0].text)
print(nodes[0].metadata)

# Generate keyword and question/answer examples for a subset of the data

This takes a long time to run and is an expensive token operation. The main reason it is included is to show the full power of llama_index paired with Pinecone

In [None]:
from llama_index.node_parser.extractors import MetadataExtractor, QuestionsAnsweredExtractor, KeywordExtractor
from llama_index.llms import OpenAI
import random

llm = OpenAI(model="gpt-3.5-turbo")

metadata_extractor = MetadataExtractor(
    extractors=[
        KeywordExtractor(nodes=5, llm=llm),
        QuestionsAnsweredExtractor(questions=3, llm=llm),
    ],
    in_place=False,
)

test_query_suggestions = metadata_extractor.process_nodes(random.sample(nodes, 3))
for tqs in test_query_suggestions:
    print(f"id: {tqs.metadata['id']}")
    print(f"name: {tqs.metadata['name']}")
    print(f"title: {tqs.metadata['title']}")
    print(f"short_description: {tqs.metadata['short_description']}")
    print('-' * 100)
    print(f"keywords: {tqs.metadata['excerpt_keywords']}")
    print(f"questions this excerpt can answer:\n{tqs.metadata['questions_this_excerpt_can_answer']}\n")
  

# Generate vector embeddings for all records

1. This takes a long time to run ~20 minutes because openai batch embeddings are slow
2. OpenAI embeddings are retrieved in batches using this method `get_text_embedding_batch()`
3. Using asyncio because it can handle a large number of concurrent tasks efficiently due to it's nonblocking nature

In [None]:
from llama_index.embeddings import OpenAIEmbedding
import asyncio

batch_size = 200
embed_model = OpenAIEmbedding(embed_batch_size=batch_size)
num_batches = (len(nodes) + batch_size - 1) // batch_size  # Calculate the number of batches

async def process_batch(nodes, start, end, embed_model):
    batch = nodes[start:end]
    batch_embeddings = embed_model.get_text_embedding_batch([node.text for node in batch])

    for node, embedding in zip(batch, batch_embeddings):
        node.embedding = embedding
    
    print(f"embeddings retrieved from OpenAI: {start} - {end}")
    
# Process batches asynchronously
async def process_batches(nodes):
    tasks = []
    for i in range(num_batches):
        start = i * batch_size
        end = min((i + 1) * batch_size, len(nodes))
        task = process_batch(nodes, start, end, embed_model)
        tasks.append(task)

    await asyncio.gather(*tasks)

await process_batches(nodes)

# Upsert vector embeddings+metadata into Pinecone for all records

1. Using asyncio because it can handle a large number of concurrent tasks efficiently due to it's nonblocking nature
2. The rest of the 

In [None]:
import asyncio

async def upsert_batch(nodes, start, end):
    batch = nodes[start:end]
    vectors = []
    
    for node in batch:
        vectors.append((node.node_id, node.embedding, node.metadata))
        
    pinecone_index.upsert(batch_size=batch_size, vectors=vectors, show_progress=False)
    print(f"embeddings upserted to Pinecone: {start} - {end}")

async def upsert_batches(nodes):
    tasks = []
    for i in range(num_batches):
        start = i * batch_size
        end = min((i + 1) * batch_size, len(nodes))
        task = upsert_batch(nodes, start, end)
        tasks.append(task)

    await asyncio.gather(*tasks)

await upsert_batches(nodes)

# Same core functionality as asyncio approach [SKIP]

1. Limited to 10 concurrent connections (safest to use pool size of 10)
2. Runs in ~9 mins vs ~2 minutes for asyncio based approach
3. Included for completeness b/c pinecone docs recommend this approach for batched upserts

In [None]:
import itertools

vectors = []
    
for node in nodes:
    vectors.append((node.node_id, node.embedding, node.metadata))

def chunks(iterable, batch_size=100):
    """A helper function to break an iterable into chunks of size batch_size."""
    it = iter(iterable)
    chunk = tuple(itertools.islice(it, batch_size))
    while chunk:
        yield chunk
        chunk = tuple(itertools.islice(it, batch_size))


with pinecone.Index(PINECONE_INDEX_NAME, pool_threads=10) as index:
    # Send requests in parallel
    async_results = [
        index.upsert(vectors=vector_chunk, async_req=True)
        for vector_chunk in chunks(vectors, batch_size=100)
    ]
    # Wait for and retrieve responses (this raises in case of error)
    [async_result.get() for async_result in async_results]

In [None]:
#pinecone_index.delete(delete_all=True)

# [OPTIONAL] Uninstall all existing python packages in the runtime
This is a brute force way to make sure that the python runtime doesn't have any package/version conflicts. 

In [None]:
!pip freeze > requirements.txt
!cat requirements.txt | xargs -n 1 pip uninstall -y