In [1]:
# Cell 1: Load the data
import pandas as pd

df = pd.read_csv("../data/knowledge_base.csv")

# Drop any rows that might have an empty knowledge_doc, just in case
df.dropna(subset=['knowledge_doc'], inplace=True)

# Take a smaller sample to work with for this comparison exercise
sample_df = df.sample(1000, random_state=42)

print(f"Loaded {len(sample_df)} sample documents.")
sample_df

Loaded 1000 sample documents.


Unnamed: 0,parent_asin,knowledge_doc
534528,B000V4DOY4,Product Title: iPhone 8 Lightning Adapter Head...
117607,B007WPHXA6,Product Title: Gear4 Angry Birds SpaceTouch Ca...
1490858,B0BN8DDG2Z,Product Title: Surface Pro 9 Pro 8 Docking Sta...
1411203,B09W5VRNDZ,Product Title: DriSentri 120W Speaker 3 Way Au...
1602348,B07VHL6G2H,Product Title: USB Bluetooth 4.0 Adapter Dongl...
...,...,...
266385,B08NF1JGNJ,"Product Title: USB C to USB C Cable, Anwaut 6...."
179185,B01MT5YSBW,"Product Title: Acer Aspire Desktop, 7th Gen In..."
1271755,B08TZSRKB5,Product Title: iPhone Cable，MFi Certified Ligh...
400631,B07CNG9TBM,"Product Title: Bliiq Bluetooth Headphones, HiF..."


In [4]:
# Calculate the length of each document
df['doc_length'] = df['knowledge_doc'].str.len()

# Get the 5 longest documents
longest_docs_df = df.nlargest(50, 'doc_length')

longest_docs_df

Unnamed: 0,parent_asin,knowledge_doc,doc_length
49516,B002LSI1LO,Product Title: EOS 7D 18MP Digital SLR Camera\...,51123
528193,B006ZTVHD4,Product Title: Canon EOS C300 Cinema EOS Camco...,50401
552947,B00UB8QO42,Product Title: Canon PowerShot G7 X Digital Ca...,32423
2205,B00852VU86,Product Title: CLS110RG-W7AE\nBrand: JL AUDIO\...,21114
1416569,B002TH2XO8,Product Title: 13W3v3-4\nBrand: JL AUDIO\nDesc...,18294
714908,B001UDM4GE,Product Title: 13W3v3-2\nBrand: JL AUDIO\nDesc...,18290
248478,B0000511YT,Product Title: Tripp Lite Hospital-Grade Isoba...,16753
203208,B00MH5GYI8,"Product Title: JL Audio ACS112LG-TW1 12"" 12TW1...",15784
1311756,B00IF9M4KM,Product Title: JL Audio HO110-W6v3 Ported H.O....,15764
1565405,B00B87NEFC,Product Title: Tripp Lite CPU Wall/Desk Mount ...,15523


In [7]:
len(df)

1610012

In [6]:
import pandas as pd

# This assumes 'df' is a pandas DataFrame that has been loaded
# with an 'asin' column.

# The ASIN to search for
target_asin = 'B08BHHSB6M'

# Find the row(s) with the matching ASIN
# The 'asin' column might have leading/trailing spaces, so we use .str.strip()
result_row = df[df['parent_asin'].str.strip() == target_asin]

# Print the result
if not result_row.empty:
    print(f"Found row with ASIN: {target_asin}\n")
    # Print the full content of the row
    for index, row in result_row.iterrows():
        print(row.to_string())
else:
    print(f"No row found with ASIN: {target_asin}")

No row found with ASIN: B08BHHSB6M


In [9]:
# Cell 2: Generate embeddings with local model
from sentence_transformers import SentenceTransformer

# Load a pre-trained, all-around model. This will be downloaded once.
local_model = SentenceTransformer('all-MiniLM-L6-v2')

# Get the list of documents to embed
documents = sample_df['knowledge_doc'].tolist()

print("Generating embeddings with the local model...")
local_embeddings = local_model.encode(documents, show_progress_bar=True)

print(f"Embeddings generated. Shape: {local_embeddings.shape}")
# The output shape will be (1000, 384), meaning 1000 vectors, each with 384 dimensions.

Generating embeddings with the local model...


Batches:   0%|          | 0/32 [00:00<?, ?it/s]

Embeddings generated. Shape: (1000, 384)


In [10]:
# Cell 3: Generate embeddings with Gemini API
import os
import google.generativeai as genai
from dotenv import load_dotenv
import numpy as np

load_dotenv()

# Configure the API key
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))

print("Generating embeddings with the Gemini API...")
# The 'text-embedding-004' model is Google's latest embedding model
# Note: The API has rate limits, so we send documents in batches.
result = genai.embed_content(
    model="models/text-embedding-004",
    content=documents,
    task_type="RETRIEVAL_DOCUMENT" # Important: Specify the task type
)

gemini_embeddings = result['embedding']

print(f"Embeddings generated. Shape: {np.array(gemini_embeddings).shape}")
# The output shape will be (1000, 768), meaning 1000 vectors, each with 768 dimensions.

Generating embeddings with the Gemini API...
Embeddings generated. Shape: (1000, 768)


In [21]:
import pandas as pd
import numpy as np
import os
import asyncio
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from dotenv import load_dotenv
from tqdm.asyncio import tqdm
import time
import sys

# ======================================================================================
# 1. CONFIGURATION
# ======================================================================================
# Make sure your .env file is in the project root, and you run this notebook from there.
load_dotenv()

# --- API Keys ---
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# --- Pinecone Config ---
INDEX_NAME = "electronics-gemini"
CLOUD_PROVIDER = "aws"
CLOUD_REGION = "us-east-1"
# Pinecone's official limit is 2MB per request. We'll use a slightly smaller
# value to be safe and account for overhead.
MAX_PINECONE_REQUEST_SIZE_BYTES = 2 * 1024 * 1024 * 0.9

# --- Gemini Config ---
# The Gemini API also has size limits. We'll create text batches that are
# roughly under a safe size limit to avoid errors. 1MB is a safe starting point.
MAX_GEMINI_REQUEST_SIZE_BYTES = 1 * 1024 * 1024
GEMINI_RPM_LIMIT = 60
DELAY_BETWEEN_REQUESTS = 60.0 / GEMINI_RPM_LIMIT

# --- Data Config ---
DATA_FILE_PATH = "../data/knowledge_base.csv"
NUM_DOCUMENTS_TO_PROCESS = 20000

# ======================================================================================
# 2. HELPER FUNCTIONS FOR ADAPTIVE BATCHING
# ======================================================================================

def create_adaptive_text_batches(documents_with_ids):
    """
    A generator that yields size-optimized batches of text documents for the Gemini API.
    """
    current_batch_docs = []
    current_batch_ids = []
    current_batch_size = 0

    for doc, doc_id in documents_with_ids:
        doc_size = sys.getsizeof(doc)

        if current_batch_size + doc_size > MAX_GEMINI_REQUEST_SIZE_BYTES and current_batch_docs:
            yield current_batch_docs, current_batch_ids
            current_batch_docs = []
            current_batch_ids = []
            current_batch_size = 0

        current_batch_docs.append(doc)
        current_batch_ids.append(doc_id)
        current_batch_size += doc_size

    if current_batch_docs:
        yield current_batch_docs, current_batch_ids

def create_adaptive_pinecone_batches(vectors_with_ids):
    """
    A generator that yields size-optimized batches for Pinecone upsert.
    """
    current_batch = []
    current_batch_size = 0

    for vec_with_id in vectors_with_ids:
        vector_size = sys.getsizeof(vec_with_id[1]) + sys.getsizeof(vec_with_id[0])

        if current_batch_size + vector_size > MAX_PINECONE_REQUEST_SIZE_BYTES and current_batch:
            yield current_batch
            current_batch = []
            current_batch_size = 0

        current_batch.append(vec_with_id)
        current_batch_size += vector_size

    if current_batch:
        yield current_batch

# ======================================================================================
# 3. ASYNC PIPELINE COMPONENTS
# ======================================================================================

async def producer_generate_embeddings(queue: asyncio.Queue, documents_with_ids: list):
    """
    An async "producer" that generates embeddings for adaptively sized batches
    of documents and puts them into a queue for the consumer.
    """
    print("Starting embedding generation with adaptive batches...")
    genai.configure(api_key=GEMINI_API_KEY)

    batch_generator = create_adaptive_text_batches(documents_with_ids)

    for batch_docs, batch_ids in tqdm(batch_generator, desc="Generating Embeddings"):
        try:
            result = await genai.embed_content_async(
                model="models/text-embedding-004",
                content=batch_docs,
                task_type="RETRIEVAL_DOCUMENT"
            )
            # Pair the generated embeddings with their original IDs
            embeddings_with_ids = list(zip(batch_ids, result['embedding']))
            await queue.put(embeddings_with_ids)

        except Exception as e:
            print(f"An error occurred during embedding generation: {e}")

        await asyncio.sleep(DELAY_BETWEEN_REQUESTS)

    await queue.put(None)
    print("Embedding generation finished.")


async def consumer_upsert_to_pinecone(queue: asyncio.Queue, index: Pinecone.Index):
    """
    An async "consumer" that gets generated embeddings, creates adaptive
    sub-batches, and upserts them to Pinecone.
    """
    print("Starting Pinecone ingestion with adaptive batches...")
    while True:
        embedding_batch = await queue.get()

        if embedding_batch is None:
            break

        try:
            # Create size-optimized sub-batches from the received embedding batch
            for adaptive_batch in create_adaptive_pinecone_batches(embedding_batch):
                loop = asyncio.get_running_loop()
                await loop.run_in_executor(None, lambda: index.upsert(vectors=adaptive_batch))
        except Exception as e:
            print(f"An error occurred during Pinecone upsert: {e}")

        queue.task_done()

    print("Pinecone ingestion finished.")

# ======================================================================================
# 4. MAIN ORCHESTRATOR
# ======================================================================================

async def main():
    """
    Main async function to orchestrate the entire pipeline.
    """
    start_time = time.time()

    print(f"Loading data from {DATA_FILE_PATH}...")
    df = pd.read_csv(DATA_FILE_PATH).head(NUM_DOCUMENTS_TO_PROCESS)
    df.dropna(subset=['knowledge_doc'], inplace=True)
    # Create a list of (document, id) tuples to pass to the producer
    documents_with_ids = list(zip(df['knowledge_doc'], df['parent_asin'].astype(str)))
    print(f"Loaded {len(documents_with_ids)} documents to process.")

    print("Initializing Pinecone...")
    pc = Pinecone(api_key=PINECONE_API_KEY)
    if INDEX_NAME not in pc.list_indexes().names():
        print(f"Creating a new serverless index: {INDEX_NAME}")
        pc.create_index(
            name=INDEX_NAME, dimension=768, metric='cosine',
            spec=ServerlessSpec(cloud=CLOUD_PROVIDER, region=CLOUD_REGION)
        )
    index = pc.Index(INDEX_NAME)
    print("Pinecone initialized.")

    embedding_queue = asyncio.Queue(maxsize=10)

    producer_task = asyncio.create_task(producer_generate_embeddings(embedding_queue, documents_with_ids))
    consumer_task = asyncio.create_task(consumer_upsert_to_pinecone(embedding_queue, index))

    await asyncio.gather(producer_task, consumer_task)

    end_time = time.time()
    print(f"\nPipeline finished in {end_time - start_time:.2f} seconds.")
    print("Final index stats:")
    print(index.describe_index_stats())

# ======================================================================================
# 5. EXECUTE THE PIPELINE
# ======================================================================================
await main()


Loading data from ../data/knowledge_base.csv...
Loaded 20000 documents to process.
Initializing Pinecone...
Pinecone initialized.
Starting embedding generation with adaptive batches...


Generating Embeddings: 0it [00:00, ?it/s]

Starting Pinecone ingestion with adaptive batches...


Generating Embeddings: 29it [04:16,  8.85s/it]


Embedding generation finished.
Pinecone ingestion finished.

Pipeline finished in 270.33 seconds.
Final index stats:
{'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 40000}},
 'total_vector_count': 40000,
 'vector_type': 'dense'}


In [23]:
import pandas as pd
import numpy as np
import os
import asyncio
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from dotenv import load_dotenv
from tqdm.asyncio import tqdm
import time
import sys

# ======================================================================================
# 1. CONFIGURATION
# ======================================================================================
# Make sure your .env file is in the project root, and you run this notebook from there.
load_dotenv()

# --- API Keys ---
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# --- Pinecone Config ---
INDEX_NAME = "electronics-gemini"
CLOUD_PROVIDER = "aws"
CLOUD_REGION = "us-east-1"
MAX_PINECONE_REQUEST_SIZE_BYTES = 2 * 1024 * 1024 * 0.9

# --- Gemini Config ---
MAX_GEMINI_REQUEST_SIZE_BYTES = 1 * 1024 * 1024
GEMINI_RPM_LIMIT = 60
DELAY_BETWEEN_REQUESTS = 60.0 / GEMINI_RPM_LIMIT

# --- Data Config ---
DATA_FILE_PATH = "../data/knowledge_base.csv"
NUM_DOCUMENTS_TO_PROCESS = 20000

# --- Concurrency Config ---
# We will create multiple "workers" to make API calls concurrently
NUM_PRODUCER_WORKERS = 5  # Number of parallel tasks calling the Gemini API
NUM_CONSUMER_WORKERS = 5  # Number of parallel tasks upserting to Pinecone

# ======================================================================================
# 2. HELPER FUNCTIONS FOR ADAPTIVE BATCHING
# ======================================================================================

def create_adaptive_text_batches(documents_with_ids):
    """
    A generator that yields size-optimized batches of text documents for the Gemini API.
    """
    current_batch_docs = []
    current_batch_ids = []
    current_batch_size = 0

    for doc, doc_id in documents_with_ids:
        doc_size = sys.getsizeof(doc)

        if current_batch_size + doc_size > MAX_GEMINI_REQUEST_SIZE_BYTES and current_batch_docs:
            yield current_batch_docs, current_batch_ids
            current_batch_docs = []
            current_batch_ids = []
            current_batch_size = 0

        current_batch_docs.append(doc)
        current_batch_ids.append(doc_id)
        current_batch_size += doc_size

    if current_batch_docs:
        yield current_batch_docs, current_batch_ids

def create_adaptive_pinecone_batches(vectors_with_ids):
    """
    A generator that yields size-optimized batches for Pinecone upsert.
    """
    current_batch = []
    current_batch_size = 0

    for vec_with_id in vectors_with_ids:
        vector_size = sys.getsizeof(vec_with_id[1]) + sys.getsizeof(vec_with_id[0])

        if current_batch_size + vector_size > MAX_PINECONE_REQUEST_SIZE_BYTES and current_batch:
            yield current_batch
            current_batch = []
            current_batch_size = 0

        current_batch.append(vec_with_id)
        current_batch_size += vector_size

    if current_batch:
        yield current_batch

# ======================================================================================
# 3. ASYNC PIPELINE COMPONENTS
# ======================================================================================

async def producer_worker(worker_id: int, queue: asyncio.Queue, documents_with_ids: list):
    """
    A single producer worker that generates embeddings for its assigned share of documents.
    """
    print(f"[Producer-{worker_id}] Starting...")
    genai.configure(api_key=GEMINI_API_KEY)

    batch_generator = create_adaptive_text_batches(documents_with_ids)

    for batch_docs, batch_ids in batch_generator:
        try:
            result = await genai.embed_content_async(
                model="models/text-embedding-004",
                content=batch_docs,
                task_type="RETRIEVAL_DOCUMENT"
            )
            embeddings_with_ids = list(zip(batch_ids, result['embedding']))
            await queue.put(embeddings_with_ids)

        except Exception as e:
            print(f"[Producer-{worker_id}] Error: {e}")

        await asyncio.sleep(DELAY_BETWEEN_REQUESTS)

    print(f"[Producer-{worker_id}] Finished.")


async def consumer_worker(worker_id: int, queue: asyncio.Queue, index: Pinecone.Index):
    """
    A single consumer worker that gets embeddings from the queue and upserts them.
    """
    print(f"[Consumer-{worker_id}] Starting...")
    while True:
        embedding_batch = await queue.get()

        if embedding_batch is None:
            # Re-add None to the queue for other consumers to see
            await queue.put(None)
            break

        try:
            for adaptive_batch in create_adaptive_pinecone_batches(embedding_batch):
                loop = asyncio.get_running_loop()
                await loop.run_in_executor(None, lambda: index.upsert(vectors=adaptive_batch))
        except Exception as e:
            print(f"[Consumer-{worker_id}] Error: {e}")

        queue.task_done()

    print(f"[Consumer-{worker_id}] Finished.")

# ======================================================================================
# 4. MAIN ORCHESTRATOR
# ======================================================================================

async def main():
    """
    Main async function to orchestrate the multi-worker pipeline.
    """
    start_time = time.time()

    print(f"Loading data from {DATA_FILE_PATH}...")
    df = pd.read_csv(DATA_FILE_PATH).head(NUM_DOCUMENTS_TO_PROCESS)
    df.dropna(subset=['knowledge_doc'], inplace=True)
    all_documents_with_ids = list(zip(df['knowledge_doc'], df['parent_asin'].astype(str)))
    print(f"Loaded {len(all_documents_with_ids)} documents to process.")

    print("Initializing Pinecone...")
    pc = Pinecone(api_key=PINECONE_API_KEY)
    if INDEX_NAME not in pc.list_indexes().names():
        pc.create_index(
            name=INDEX_NAME, dimension=768, metric='cosine',
            spec=ServerlessSpec(cloud=CLOUD_PROVIDER, region=CLOUD_REGION)
        )
    index = pc.Index(INDEX_NAME)
    print("Pinecone initialized.")

    # The shared queue for all workers
    embedding_queue = asyncio.Queue(maxsize=20)

    # Create and start consumer workers first
    consumer_tasks = [
        asyncio.create_task(consumer_worker(i, embedding_queue, index))
        for i in range(NUM_CONSUMER_WORKERS)
    ]

    # Split the documents among the producer workers
    docs_per_worker = len(all_documents_with_ids) // NUM_PRODUCER_WORKERS
    producer_tasks = []
    for i in range(NUM_PRODUCER_WORKERS):
        start_index = i * docs_per_worker
        # Give the last worker all remaining documents
        end_index = (i + 1) * docs_per_worker if i < NUM_PRODUCER_WORKERS - 1 else len(all_documents_with_ids)
        worker_docs = all_documents_with_ids[start_index:end_index]
        task = asyncio.create_task(producer_worker(i, embedding_queue, worker_docs))
        producer_tasks.append(task)

    # Wait for all producers to finish
    await asyncio.gather(*producer_tasks)
    print("\nAll producer workers have finished.")

    # Once producers are done, signal to the consumers to shut down
    await embedding_queue.put(None)

    # Wait for all consumers to finish
    await asyncio.gather(*consumer_tasks)
    print("All consumer workers have finished.")

    end_time = time.time()
    print(f"\nPipeline finished in {end_time - start_time:.2f} seconds.")
    print("Final index stats:")
    print(index.describe_index_stats())

# ======================================================================================
# 5. EXECUTE THE PIPELINE
# ======================================================================================
await main()


Loading data from ../data/knowledge_base.csv...
Loaded 20000 documents to process.
Initializing Pinecone...
Pinecone initialized.
[Consumer-0] Starting...
[Consumer-1] Starting...
[Consumer-2] Starting...
[Consumer-3] Starting...
[Consumer-4] Starting...
[Producer-0] Starting...
[Producer-1] Starting...
[Producer-2] Starting...
[Producer-3] Starting...
[Producer-4] Starting...
[Producer-3] Finished.
[Producer-4] Finished.
[Producer-2] Finished.
[Producer-0] Finished.
[Producer-1] Finished.

All producer workers have finished.
[Consumer-0] Finished.
[Consumer-3] Finished.
[Consumer-1] Finished.
[Consumer-4] Finished.
[Consumer-2] Finished.
All consumer workers have finished.

Pipeline finished in 80.72 seconds.
Final index stats:
{'dimension': 768,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 40000}},
 'total_vector_count': 40000,
 'vector_type': 'dense'}


In [37]:
import pandas as pd
import numpy as np
import os
import asyncio
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from dotenv import load_dotenv
from tqdm.asyncio import tqdm
import time
import sys

# ======================================================================================
# 1. CONFIGURATION
# ======================================================================================
# Make sure your .env file is in the project root.
load_dotenv()

# --- API Keys ---
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

# --- Pinecone Config ---
INDEX_NAME = "electronics-gemini"
CLOUD_PROVIDER = "aws"
CLOUD_REGION = "us-east-1"
MAX_PINECONE_REQUEST_SIZE_BYTES = 2 * 1024 * 1024 * 0.9

# --- Gemini Config ---
MAX_GEMINI_REQUEST_SIZE_BYTES = 1 * 1024 * 1024
# Set this to your account's RPM limit to calculate the delay.
# Updated to 120 RPM as requested.
GEMINI_RPM_LIMIT = 120

# --- Data Config ---
# Path is now relative to the notebook's location in the /notebooks directory
DATA_FILE_PATH = "../data/knowledge_base.csv"
NUM_DOCUMENTS_TO_PROCESS = 20000

# --- Concurrency Config ---
# We can keep a higher number of workers as the rate limiter will manage the API calls.
NUM_PRODUCER_WORKERS = 10
NUM_CONSUMER_WORKERS = 30

# ======================================================================================
# 2. ASYNC RATE LIMITER
# ======================================================================================

class AsyncRateLimiter:
    """
    A robust asynchronous rate limiter that enforces a specified number of calls
    per minute. It uses a token bucket-like approach to control the request rate
    across all concurrent workers.
    """
    def __init__(self, rate_limit: int, period_seconds: int = 60):
        self.rate_limit = rate_limit
        # Calculate the required delay between each request to not exceed the rate limit
        self.delay = period_seconds / rate_limit
        self._lock = asyncio.Lock()
        self._last_request_time = 0

    async def acquire(self):
        """Acquires a permit from the rate limiter, blocking if necessary."""
        async with self._lock:
            current_time = time.monotonic()
            time_since_last = current_time - self._last_request_time

            # If the time since the last request is less than the required delay,
            # wait for the remaining time.
            if time_since_last < self.delay:
                await asyncio.sleep(self.delay - time_since_last)

            # Update the time of the last request to the current time
            self._last_request_time = time.monotonic()


# ======================================================================================
# 3. HELPER FUNCTIONS FOR ADAPTIVE BATCHING
# ======================================================================================

def create_adaptive_text_batches(documents_with_ids):
    # ... (This function remains unchanged)
    current_batch_docs = []
    current_batch_ids = []
    current_batch_size = 0
    for doc, doc_id in documents_with_ids:
        doc_size = sys.getsizeof(doc)
        if current_batch_size + doc_size > MAX_GEMINI_REQUEST_SIZE_BYTES and current_batch_docs:
            yield current_batch_docs, current_batch_ids
            current_batch_docs, current_batch_ids, current_batch_size = [], [], 0
        current_batch_docs.append(doc)
        current_batch_ids.append(doc_id)
        current_batch_size += doc_size
    if current_batch_docs:
        yield current_batch_docs, current_batch_ids

def create_adaptive_pinecone_batches(vectors_with_ids):
    # ... (This function remains unchanged)
    current_batch = []
    current_batch_size = 0
    for vec_with_id in vectors_with_ids:
        vector_size = sys.getsizeof(vec_with_id[1]) + sys.getsizeof(vec_with_id[0])
        if current_batch_size + vector_size > MAX_PINECONE_REQUEST_SIZE_BYTES and current_batch:
            yield current_batch
            current_batch, current_batch_size = [], 0
        current_batch.append(vec_with_id)
        current_batch_size += vector_size
    if current_batch:
        yield current_batch

# ======================================================================================
# 4. ASYNC PIPELINE COMPONENTS
# ======================================================================================

async def producer_worker(worker_id: int, queue: asyncio.Queue, documents_with_ids: list, rate_limiter: AsyncRateLimiter):
    """
    A single producer worker that uses the shared rate limiter before making API calls.
    """
    start_time = time.time()
    print(f"[Producer-{worker_id}] Starting...")
    genai.configure(api_key=GEMINI_API_KEY)

    batch_generator = create_adaptive_text_batches(documents_with_ids)

    total_docs_processed = 0
    for batch_docs, batch_ids in batch_generator:
        try:
            # *** THIS IS THE FIX: Acquire a permit from the rate limiter ***
            await rate_limiter.acquire()

            result = await genai.embed_content_async(
                model="models/text-embedding-004",
                content=batch_docs,
                task_type="RETRIEVAL_DOCUMENT"
            )
            embeddings_with_ids = list(zip(batch_ids, result['embedding']))
            total_docs_processed += len(embeddings_with_ids)
            await queue.put(embeddings_with_ids)

        except Exception as e:
            print(f"[Producer-{worker_id}] Error: {e}")

    end_time = time.time()
    print(f"[Producer-{worker_id}] Finished. Processed {total_docs_processed} docs in {end_time - start_time:.2f} seconds.")


async def consumer_worker(worker_id: int, queue: asyncio.Queue, index: Pinecone.Index):
    # ... (This function remains unchanged)
    start_time = time.time()
    print(f"[Consumer-{worker_id}] Starting...")
    total_vectors_processed = 0
    while True:
        embedding_batch = await queue.get()
        if embedding_batch is None:
            await queue.put(None)
            break
        try:
            for i, adaptive_batch in enumerate(create_adaptive_pinecone_batches(embedding_batch)):
                batch_vector_count = len(adaptive_batch)
                total_vectors_processed += batch_vector_count
                loop = asyncio.get_running_loop()
                await loop.run_in_executor(None, lambda: index.upsert(vectors=adaptive_batch))
        except Exception as e:
            print(f"[Consumer-{worker_id}] Error: {e}")
        queue.task_done()
    end_time = time.time()
    print(f"[Consumer-{worker_id}] Finished. Processed {total_vectors_processed} vectors in {end_time - start_time:.2f} seconds.")

# ======================================================================================
# 5. MAIN ORCHESTRATOR
# ======================================================================================

async def main():
    """
    Main async function to orchestrate the multi-worker pipeline.
    """
    start_time = time.time()

    print(f"Loading data from {DATA_FILE_PATH}...")
    df = pd.read_csv(DATA_FILE_PATH).head(NUM_DOCUMENTS_TO_PROCESS)
    df.dropna(subset=['knowledge_doc', 'parent_asin'], inplace=True)
    df['parent_asin'] = df['parent_asin'].astype(str)
    all_documents_with_ids = list(zip(df['knowledge_doc'], df['parent_asin']))
    print(f"Loaded {len(all_documents_with_ids)} documents to process.")

    print("Initializing Pinecone...")
    pc = Pinecone(api_key=PINECONE_API_KEY)

    if INDEX_NAME in pc.list_indexes().names():
        index = pc.Index(INDEX_NAME)
        print(f"Index '{INDEX_NAME}' already exists. Deleting all vectors to start fresh...")
        index.delete(delete_all=True)
        print("All vectors deleted.")
    else:
        print(f"Creating a new serverless index: {INDEX_NAME}")
        pc.create_index(
            name=INDEX_NAME, dimension=768, metric='cosine',
            spec=ServerlessSpec(cloud=CLOUD_PROVIDER, region=CLOUD_REGION)
        )

    index = pc.Index(INDEX_NAME)
    print("Pinecone initialized and ready.")

    # Create the shared resources: the queue and the new rate limiter
    embedding_queue = asyncio.Queue(maxsize=NUM_PRODUCER_WORKERS * 2)
    rate_limiter = AsyncRateLimiter(rate_limit=GEMINI_RPM_LIMIT, period_seconds=60)

    # --- Create and start workers ---
    consumer_tasks = [
        asyncio.create_task(consumer_worker(i, embedding_queue, index))
        for i in range(NUM_CONSUMER_WORKERS)
    ]

    docs_per_worker = len(all_documents_with_ids) // NUM_PRODUCER_WORKERS
    producer_tasks = []
    for i in range(NUM_PRODUCER_WORKERS):
        start_index = i * docs_per_worker
        end_index = (i + 1) * docs_per_worker if i < NUM_PRODUCER_WORKERS - 1 else len(all_documents_with_ids)
        worker_docs = all_documents_with_ids[start_index:end_index]
        if not worker_docs: continue
        # Pass the shared rate limiter to each producer worker
        task = asyncio.create_task(producer_worker(i, embedding_queue, worker_docs, rate_limiter))
        producer_tasks.append(task)

    # --- Wait for completion ---
    await asyncio.gather(*producer_tasks)
    print("\nAll producer workers have finished.")

    # Signal consumers to shut down
    await embedding_queue.put(None)

    await asyncio.gather(*consumer_tasks)
    print("All consumer workers have finished.")

    end_time = time.time()
    print(f"\nPipeline finished in {end_time - start_time:.2f} seconds.")

# ======================================================================================
# 6. EXECUTE THE PIPELINE
# ======================================================================================
await main()


Loading data from ../data/knowledge_base.csv...
Loaded 20000 documents to process.
Initializing Pinecone...
Index 'electronics-gemini' already exists. Deleting all vectors to start fresh...
All vectors deleted.
Pinecone initialized and ready.
[Consumer-0] Starting...
[Consumer-1] Starting...
[Consumer-2] Starting...
[Consumer-3] Starting...
[Consumer-4] Starting...
[Consumer-5] Starting...
[Consumer-6] Starting...
[Consumer-7] Starting...
[Consumer-8] Starting...
[Consumer-9] Starting...
[Consumer-10] Starting...
[Consumer-11] Starting...
[Consumer-12] Starting...
[Consumer-13] Starting...
[Consumer-14] Starting...
[Consumer-15] Starting...
[Consumer-16] Starting...
[Consumer-17] Starting...
[Consumer-18] Starting...
[Consumer-19] Starting...
[Consumer-20] Starting...
[Consumer-21] Starting...
[Consumer-22] Starting...
[Consumer-23] Starting...
[Consumer-24] Starting...
[Consumer-25] Starting...
[Consumer-26] Starting...
[Consumer-27] Starting...
[Consumer-28] Starting...
[Consumer-29]

In [38]:
import pandas as pd
import numpy as np
import os
import asyncio
import google.generativeai as genai
from pinecone import Pinecone, ServerlessSpec
from dotenv import load_dotenv
from tqdm.asyncio import tqdm
import time
import sys
import itertools

# ======================================================================================
# 1. CONFIGURATION
# ======================================================================================
# Make sure your .env file is in the project root.
load_dotenv()

# --- API Keys ---
# *** NEW: Load all available Gemini API keys from the .env file ***
GEMINI_API_KEYS = [os.getenv(key) for key in os.environ if key.startswith("GEMINI_API_KEY") and os.getenv(key)]
if not GEMINI_API_KEYS:
    raise ValueError("No GEMINI_API_KEY... variables found in .env file. Please add at least one.")
print(f"Found {len(GEMINI_API_KEYS)} Gemini API keys to use in the pool.")

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")

# --- Pinecone Config ---
INDEX_NAME = "electronics-gemini"
CLOUD_PROVIDER = "aws"
CLOUD_REGION = "us-east-1"
MAX_PINECONE_REQUEST_SIZE_BYTES = 2 * 1024 * 1024 * 0.9

# --- Gemini Config ---
MAX_GEMINI_REQUEST_SIZE_BYTES = 1 * 1024 * 1024

# --- Data Config ---
DATA_FILE_PATH = "../data/knowledge_base.csv"
NUM_DOCUMENTS_TO_PROCESS = 20000

# --- Concurrency Config ---
# As requested, we will use 10 workers for each available API key.
WORKERS_PER_KEY = 10
NUM_PRODUCER_WORKERS = len(GEMINI_API_KEYS) * WORKERS_PER_KEY
NUM_CONSUMER_WORKERS = 30 # This can remain high

# ======================================================================================
# 2. HELPER FUNCTIONS FOR ADAPTIVE BATCHING
# ======================================================================================

def create_adaptive_text_batches(documents_with_ids):
    """
    A generator that yields size-optimized batches of text documents for the Gemini API.
    """
    current_batch_docs, current_batch_ids, current_batch_size = [], [], 0
    for doc, doc_id in documents_with_ids:
        doc_size = sys.getsizeof(doc)
        if current_batch_size + doc_size > MAX_GEMINI_REQUEST_SIZE_BYTES and current_batch_docs:
            yield current_batch_docs, current_batch_ids
            current_batch_docs, current_batch_ids, current_batch_size = [], [], 0
        current_batch_docs.append(doc)
        current_batch_ids.append(doc_id)
        current_batch_size += doc_size
    if current_batch_docs:
        yield current_batch_docs, current_batch_ids

def create_adaptive_pinecone_batches(vectors_with_ids):
    """
    A generator that yields size-optimized batches for Pinecone upsert.
    """
    current_batch, current_batch_size = [], 0
    for vec_with_id in vectors_with_ids:
        vector_size = sys.getsizeof(vec_with_id[1]) + sys.getsizeof(vec_with_id[0])
        if current_batch_size + vector_size > MAX_PINECONE_REQUEST_SIZE_BYTES and current_batch:
            yield current_batch
            current_batch, current_batch_size = [], 0
        current_batch.append(vec_with_id)
        current_batch_size += vector_size
    if current_batch:
        yield current_batch

# ======================================================================================
# 3. ASYNC PIPELINE COMPONENTS
# ======================================================================================

async def producer_worker(worker_id: int, api_key: str, queue: asyncio.Queue, documents_with_ids: list):
    """
    A single producer worker that uses its own assigned API key.
    """
    start_time = time.time()
    print(f"[Producer-{worker_id}] Starting (using key ending in ...{api_key[-4:]})")
    # Each worker configures with its own key
    genai.configure(api_key=api_key)

    batch_generator = create_adaptive_text_batches(documents_with_ids)

    total_docs_processed = 0
    for batch_docs, batch_ids in batch_generator:
        try:
            # A simple sleep per batch per worker is a safe way to respect RPM limits
            await asyncio.sleep(1)
            result = await genai.embed_content_async(
                model="models/text-embedding-004",
                content=batch_docs,
                task_type="RETRIEVAL_DOCUMENT"
            )
            embeddings_with_ids = list(zip(batch_ids, result['embedding']))
            total_docs_processed += len(embeddings_with_ids)
            await queue.put(embeddings_with_ids)

        except Exception as e:
            print(f"[Producer-{worker_id}] Error: {e}")

    end_time = time.time()
    print(f"[Producer-{worker_id}] Finished. Processed {total_docs_processed} docs in {end_time - start_time:.2f} seconds.")


async def consumer_worker(worker_id: int, queue: asyncio.Queue, index: Pinecone.Index):
    """
    A single consumer worker that gets embeddings from the queue and upserts them.
    """
    start_time = time.time()
    print(f"[Consumer-{worker_id}] Starting...")
    total_vectors_processed = 0
    while True:
        embedding_batch = await queue.get()
        if embedding_batch is None:
            await queue.put(None)
            break
        try:
            for i, adaptive_batch in enumerate(create_adaptive_pinecone_batches(embedding_batch)):
                total_vectors_processed += len(adaptive_batch)
                loop = asyncio.get_running_loop()
                await loop.run_in_executor(None, lambda: index.upsert(vectors=adaptive_batch))
        except Exception as e:
            print(f"[Consumer-{worker_id}] Error: {e}")
        queue.task_done()
    end_time = time.time()
    print(f"[Consumer-{worker_id}] Finished. Processed {total_vectors_processed} vectors in {end_time - start_time:.2f} seconds.")

# ======================================================================================
# 4. MAIN ORCHESTRATOR
# ======================================================================================

async def main():
    """
    Main async function to orchestrate the multi-key, multi-worker pipeline.
    """
    start_time = time.time()

    print(f"Loading data from {DATA_FILE_PATH}...")
    df = pd.read_csv(DATA_FILE_PATH).head(NUM_DOCUMENTS_TO_PROCESS)
    df.dropna(subset=['knowledge_doc', 'parent_asin'], inplace=True)
    df['parent_asin'] = df['parent_asin'].astype(str)
    all_documents_with_ids = list(zip(df['knowledge_doc'], df['parent_asin']))
    print(f"Loaded {len(all_documents_with_ids)} documents to process.")

    print("Initializing Pinecone...")
    pc = Pinecone(api_key=PINECONE_API_KEY)

    if INDEX_NAME in pc.list_indexes().names():
        index = pc.Index(INDEX_NAME)
        index.delete(delete_all=True)
    else:
        pc.create_index(
            name=INDEX_NAME, dimension=768, metric='cosine',
            spec=ServerlessSpec(cloud=CLOUD_PROVIDER, region=CLOUD_REGION)
        )

    index = pc.Index(INDEX_NAME)
    print("Pinecone initialized and ready.")

    embedding_queue = asyncio.Queue(maxsize=NUM_PRODUCER_WORKERS * 2)

    consumer_tasks = [
        asyncio.create_task(consumer_worker(i, embedding_queue, index))
        for i in range(NUM_CONSUMER_WORKERS)
    ]

    # Create a cycle of the available API keys for round-robin assignment
    key_cycle = itertools.cycle(GEMINI_API_KEYS)

    # Distribute the documents evenly among the total number of producer workers
    docs_per_worker = len(all_documents_with_ids) // NUM_PRODUCER_WORKERS
    producer_tasks = []
    for i in range(NUM_PRODUCER_WORKERS):
        start_index = i * docs_per_worker
        end_index = (i + 1) * docs_per_worker if i < NUM_PRODUCER_WORKERS - 1 else len(all_documents_with_ids)
        worker_docs = all_documents_with_ids[start_index:end_index]
        if not worker_docs: continue

        # Assign an API key from the pool to this worker
        api_key = next(key_cycle)
        task = asyncio.create_task(producer_worker(i, api_key, embedding_queue, worker_docs))
        producer_tasks.append(task)

    await asyncio.gather(*producer_tasks)
    print("\nAll producer workers have finished.")

    await embedding_queue.put(None)

    await asyncio.gather(*consumer_tasks)
    print("All consumer workers have finished.")

    end_time = time.time()
    print(f"\nPipeline finished in {end_time - start_time:.2f} seconds.")

# ======================================================================================
# 5. EXECUTE THE PIPELINE
# ======================================================================================
await main()


Found 2 Gemini API keys to use in the pool.
Loading data from ../data/knowledge_base.csv...
Loaded 20000 documents to process.
Initializing Pinecone...
Pinecone initialized and ready.
[Consumer-0] Starting...
[Consumer-1] Starting...
[Consumer-2] Starting...
[Consumer-3] Starting...
[Consumer-4] Starting...
[Consumer-5] Starting...
[Consumer-6] Starting...
[Consumer-7] Starting...
[Consumer-8] Starting...
[Consumer-9] Starting...
[Consumer-10] Starting...
[Consumer-11] Starting...
[Consumer-12] Starting...
[Consumer-13] Starting...
[Consumer-14] Starting...
[Consumer-15] Starting...
[Consumer-16] Starting...
[Consumer-17] Starting...
[Consumer-18] Starting...
[Consumer-19] Starting...
[Consumer-20] Starting...
[Consumer-21] Starting...
[Consumer-22] Starting...
[Consumer-23] Starting...
[Consumer-24] Starting...
[Consumer-25] Starting...
[Consumer-26] Starting...
[Consumer-27] Starting...
[Consumer-28] Starting...
[Consumer-29] Starting...
[Producer-0] Starting (using key ending in ...