In [1]:
print('villam RAG chatbot setup')

villam RAG chatbot setup


In [2]:
import os 
from dotenv import load_dotenv  # to load API keys from .env file 

# load .env file content (GOOGLE_API_KEY, PINECONE_API_KEY)
load_dotenv()

# check if API keys loaded
print(f'GOOGLE_API_KEY Loaded: {bool(os.getenv('GOOGLE_API_KEY'))}')
print(f'PINECONE_API_KEY loaded: {bool(os.getenv('PINECONE_API_KEY'))}')

GOOGLE_API_KEY Loaded: True
PINECONE_API_KEY loaded: True


In [3]:
from langchain_community.document_loaders.csv_loader import CSVLoader

# Path to your CSV
csv_path = "villamhub_rag_dataset.csv"

# Load the CSV using 'content' as the main text
csv_loader = CSVLoader(
    file_path=csv_path,
    source_column="source",              # Optional: includes source metadata
    metadata_columns=["id"],             # Optional: include id as metadata
    encoding="utf-8"
)

# Load all rows as LangChain Documents
documents = csv_loader.load()

print(f"Loaded {len(documents)} records from {csv_path}")
print("Sample content:", documents[0].page_content[:200])


Loaded 20 records from villamhub_rag_dataset.csv
Sample content: text: Villam Hub is an urban farming platform focused on helping people grow food, plant trees, and live more sustainably. It combines digital technology with offline experiences.
source: VillamHub_Ov


In [4]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

# the function to split the text into chuncks 
def split_docs(documents, chunk_size=1500, chunk_overlap=100): 
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, 
        chunk_overlap=chunk_overlap
    )

    return text_splitter.split_documents(documents)

# split the ddocuments using the function 
docs = split_docs(documents,chunk_size=1500, chunk_overlap=100)

# check how many chunks
print(f'split into {len(docs)} chunks') 

split into 20 chunks


In [5]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings

# initialize the goggle embedding model (768 dimensions)
embedding_model = GoogleGenerativeAIEmbeddings(model= 'models/embedding-001')

# to test the model
test_text = "Villam Hub empowers urban farmers."
embedding = embedding_model.embed_query(test_text)

print(f"Length of vector: {len(embedding)}")

Length of vector: 768


In [6]:
import re  # for regex to extract numbers from error messages

# Helper function to extract wait time from API error messages if we get rate limited
def parse_retry_wait_time(error):
    """
    Looks inside an error message (usually from an API rate limit error),
    and tries to find a specific number of seconds the API suggests we should wait before retrying.
    Example:
    If the error message says "Too many requests. Retry in 30 seconds", this function returns 30.
    If no number is found, it returns a default wait time of 20 seconds.
    """
    error_text = str(error) 
    # Search for a pattern like: "retry in 30 seconds"
    match = re.search(r"retry in (\d+) seconds", error_text, re.IGNORECASE)
    # If found, extract the number and return it as integer
    return int(match.group(1)) if match else 20  # Fallback to 20 seconds if no number is found

In [7]:
from tqdm import tqdm
import time   # used to pause between retries if embedding fails

# Function to retry embedding in case of temporary errors like rate limits
def embed_batch_with_retry(embed_model, batch_contents, max_attempts=3):
    """
    Tries to embed a batch of text chunks using the provided embedding model.
    Retries up to `max_attempts` (3) times if it fails due to errors like rate limiting.
    Uses dynamic wait time based on the error message if possible.
    """
    for attempt in range(max_attempts):
        try:
            # Attempt to embed the documents
            return embed_model.embed_documents(batch_contents)   
        
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}") 
             # If it's the last attempt, raise the error to stop execution
            if attempt == max_attempts - 1:
                print("All attempts failed. Exiting.")
                raise
            else:
                # Use the helpe rfunction to extract recommended wait time or default to 20s
                wait_time = parse_retry_wait_time(e)
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)

# Basically, the function sends a batch of text chunks to be embedded, and if it fails, it patiently retries 3 times.

In [8]:
import concurrent.futures
# Function to embed all document chunks using concurrent threads for speed
def concurrent_embed_documents(embed_model, documents, batch_size=50, max_workers=4):
    """
    Splits the full list of `documents` into batches.
    Sends each batch to be embedded in parallel using threads.
    Returns two lists: all vector embeddings and the original text chunks.
    """
    all_embeddings = []  # Store all the vector outputs (embedded texts) here
    all_contents = []    # Store all the original matching text chunks here
    futures = []         # Track background embedding tasks

    # Create a pool of up to `max_workers` threads to run batches in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Loop through documents in batches of 50 (`batch_size`)
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]                            # Get the current batch
            batch_contents = [doc.page_content for doc in batch]        # Extract just the text

            # Submit the batch to be processed in the background
            future = executor.submit(embed_batch_with_retry, embed_model, batch_contents)
            futures.append((future, batch_contents))

        # Show progress bar while each future is being processed
        for future, contents in tqdm(futures, total=len(futures), desc="Embedding batches"):
            try:
                batch_embeddings = future.result()                      # Wait for result from the thread
                all_embeddings.extend(batch_embeddings)                 # Add results to the main list
                all_contents.extend(contents)                           # Save the original texts too
            except Exception as e:
                print(f"Batch failed: {e}")

    return all_embeddings, all_contents

# the function Breaks the "documents" into 50-piece chunks,Embeds them in parallel (faster)...
# as well as Handles errors and retrying & Returns all the vectors and their matching text.

In [9]:
# HOW to use 
print("Generating embeddings for all document chunks...")
all_embeddings, all_batch_content = concurrent_embed_documents(embedding_model, docs)
print(f" Generated {len(all_embeddings)} embeddings.")


Generating embeddings for all document chunks...


Embedding batches: 100%|█████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.14s/it]

 Generated 20 embeddings.





In [10]:
from pinecone import Pinecone, ServerlessSpec

# Load API key from environment variable
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])

# Define  Pinecone index name
index_name = "villambot"  

# Check if the index already exists
existing_indexes = pc.list_indexes()
existing_index_names = [index.name for index in existing_indexes.indexes]

if index_name not in existing_index_names:
    # Create a new index if it doesn't exist
    pc.create_index(
        name=index_name,
        dimension=768,                          # Must match vector size from embedding model
        metric='cosine',                        # Cosine similarity is standard for RAG
        spec=ServerlessSpec(cloud='aws', region='us-east-1')  # Free-tier friendly setup
    )
    print(f"Created Pinecone index: {index_name}")
    time.sleep(60)  # Allow time for the index to be fully ready
else:
    print("Pinecone index already exists.")

# Connect to the index
pinecone_index = pc.Index(index_name)
print(f"Connected to Pinecone index: {index_name}")

Pinecone index already exists.
Connected to Pinecone index: villambot


In [11]:
# upload vectors to pinecone(the vector database)

BATCH_SIZE = 100  # Number of vectors to upload at once
# Prepare vectors to upsert: an ID (as a string), the embedding vector, metadata (original text)
vectors_to_upsert = [
    (str(idx), embedding, {"text": content})
    for idx, (embedding, content) in enumerate(zip(all_embeddings, all_batch_content))
]

# str(idx): Unique ID for each chunk, using the index position   
# embedding: The vector (list of 768 float values) from Google GenAI
# {"text": content}: Metadata dictionary

# Upload vectors to Pinecone in batches 
def batch_upsert(index, vectors, batch_size=BATCH_SIZE):
    """
    Uploads the vectors to Pinecone in small batches.
    Retries up to 3 times per batch in case of errors (e.g., rate limiting).
    """
    # Split the full list into smaller batches
    batches = [vectors[i:i+batch_size] for i in range(0, len(vectors), batch_size)]

    for batch_number, batch in enumerate(tqdm(batches, desc="Upserting batches", total=len(batches))):
        for attempt in range(3):
            try:
                index.upsert(vectors=batch) # Send the batch to Pinecone
                break  # Success, Move to next batch
            except Exception as e:    # Handle any errors
                print(f"Batch {batch_number+1}, Attempt {attempt+1} failed: {e}")
                if attempt == 2: # If it's the third failure
                    print(f" Batch {batch_number+1} failed after 3 attempts.")
                    raise e  # Stop the script
                else:
                    time.sleep(10)  # Wait before retrying

# Run the upsert 
print("\n Starting Pinecone batched upserts...\n")
batch_upsert(pinecone_index, vectors_to_upsert)
print("\n Pinecone vector storage complete\n")


 Starting Pinecone batched upserts...



Upserting batches: 100%|█████████████████████████████████████████████████████████████████| 1/1 [00:08<00:00,  8.90s/it]


 Pinecone vector storage complete




