In [7]:
from transformers import AutoTokenizer, AutoModel
import numpy as np
import os
from dotenv import load_dotenv
import re
import uuid
import torch
import json
import tqdm as notebook_tqdm
import google.generativeai as genai

load_dotenv()

True

# Chunking

In [8]:
def chunking(directory_path,tokenizer,chunk_size,para_separator="\n\n",separator=" "):
  """
  Split document content into chunks while preserving semantic meaning.

  Args:
    directory_path (str): Path to documents directory
    tokenizer (str): Tokenizer model
    chunk_size (int): Maximum tokens per chunk
    para_separator (str): Paragraph separator
    separator (str): Word separator

  Returns:
    dict: Document chunks with metadata
  """

  #tokenizer=AutoTokenizer.from_pretrained(model_name)
  documents={}
    
  for filename in os.listdir(directory_path):
    file_path=os.path.join(directory_path,filename)
    if not os.path.isfile(file_path):
      continue
    
    with open(file_path,'r',encoding='utf-8') as file:
        text=file.read()
    
    base=os.path.basename(file_path)
    sku=os.path.splitext(base)[0]
    
    doc_id=str(uuid.uuid4())
    chunk_collection={}
    
    potential_paragraphs = re.split(r'\n\n', text)
    
    paragraphs = []
    for p in potential_paragraphs:
      paragraphs.extend(re.split(r'\n', p))
      
    paragraphs = [p.strip() for p in paragraphs if p.strip()]

    for paragraph in paragraphs:
      current_chunk=[]
      current_chunk_len=0
      
      for word in paragraph.split(separator):
        word_tokens=len(tokenizer.tokenize(word))
        if current_chunk_len+word_tokens<=chunk_size:
          current_chunk.append(word)
          current_chunk_len+=word_tokens
        else:
          if current_chunk:
            chunk_text=separator.join(current_chunk)
            chunk_id=str(uuid.uuid4())
            chunk_collection[chunk_id]={"TEXT":chunk_text,"metadata": {"filename": sku}}
          
          current_chunk=[word]
          current_chunk_len=word_tokens
          
      # Adding the remaining chunk
      if current_chunk:
        chunk_text=separator.join(current_chunk)
        chunk_id=str(uuid.uuid4())
        chunk_collection[chunk_id]={"TEXT":chunk_text,"metadata": {"filename": sku}}
      
    documents[doc_id]=chunk_collection
    
  return documents
          




# Indexing

In [9]:
# model_name= "BAAI/llm-embedder"

# tokenizer=AutoTokenizer.from_pretrained(model_name)
# model=AutoModel.from_pretrained(model_name)

def map_document_embeddings(documents,tokenizer,model_name):
    mapped_document_db={}
    model=AutoModel.from_pretrained(model_name)
    model.to("cuda")
    
    for id, dict_content in documents.items():
        mapped_document_db[id]={}
        for chunk_id, chunk_content in dict_content.items():
            mapped_embeddings={}
            text=chunk_content.get("TEXT")
            inputs=tokenizer(text,return_tensors="pt",padding=True,truncation=True)
            with torch.no_grad():
                inputs = {k: v.to("cuda") for k, v in inputs.items()}
                embeddings=model(**inputs).last_hidden_state.mean(dim=1).squeeze().cpu().tolist()
            mapped_embeddings[chunk_id]=embeddings
            mapped_document_db[id]=mapped_embeddings
            
    return mapped_document_db


# Retrival

In [10]:
def compute_embeddings(query,tokenizer,model_name):
    model=AutoModel.from_pretrained(model_name)
    model.to("cuda")
    query_inputs=tokenizer(query,return_tensors="pt",padding=True,truncation=True)
    with torch.no_grad():
        query_inputs = {k: v.to("cuda") for k, v in query_inputs.items()}
        query_embedding=model(**query_inputs).last_hidden_state.mean(dim=1).squeeze().cpu().tolist()
    
    return query_embedding

def cosine_sim_score(query_embeddings,chunk_embeddings):
    normalized_chunk=np.linalg.norm(chunk_embeddings)
    normalized_query=np.linalg.norm(query_embeddings)
            
    if normalized_chunk==0 or normalized_query==0:
        score=0
    else:
        score=np.dot(query_embeddings,chunk_embeddings)/(normalized_query*normalized_chunk)
                
    return score

def get_top_k_scores(query_embeddings,mapped_document_db,top_k):
    scores={} 
    
    for doc_id, chunk_dict in mapped_document_db.items():
        for chunk_id, chunk_embeddings in chunk_dict.items():
            chunk_embedding=np.array(chunk_embeddings)
            score=cosine_sim_score(query_embeddings,chunk_embeddings)
            scores[(doc_id,chunk_id)]=score
    
    sorted_scores=sorted(scores.items(),key=lambda item: item[1],reverse=True)[:top_k]
    
    return sorted_scores

def retrieve_top_results(sorted_scores):
    top_results=[]
    for((doc_id,chunk_id),score) in sorted_scores:
        results=(doc_id, chunk_id,score)
        top_results.append(results)
    
    return top_results 

def save_json(path,data):
    with open(path,'w') as f:
        json.dump(data,f,indent=4)
        
def read_json(path):
    with open(path,'r') as f:
        data=json.load(f)
    
    return data

def retrieve_text(top_results, document):
    first_match=top_results[0]
    doc_id=first_match[0]
    chunk_id=first_match[1]
    related_text=document[doc_id][chunk_id]
    
    return related_text 

# LLM response


In [11]:
def generate_llm_response(gemini_model,query,relevant_text):
    prompt = f"""
    You are an intelligent search engine. You will be provided with some retrieved context, as well as the users query.

    Your job is to understand the request, and answer based on the retrieved context.
    Here is context:

    <context>
    {relevant_text["TEXT"]}
    </context>

    Question: {query}
    """
    response = gemini_model.generate_content(prompt)
    return response.text

In [12]:
directory_path="./documents"

embedding_model_name="BAAI/bge-small-en-v1.5"
tokenizer= AutoTokenizer.from_pretrained(embedding_model_name)

chunk_size=200
para_separator="\\n\\n"
separator=" "
top_k=3

genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model_name="gemini-2.0-flash-lite"
gemini_model = genai.GenerativeModel(model_name)

documents = chunking(directory_path,tokenizer,chunk_size,para_separator,separator)

mapped_document_db=map_document_embeddings(documents,tokenizer,embedding_model_name)

save_json('database/doc_store_2.json',documents)
save_json('database/vector_store_2.json',mapped_document_db)

query = "why toddlers throw tantrums?"
query_embeddings = compute_embeddings(query, tokenizer, embedding_model_name)
sorted_scores = get_top_k_scores(query_embeddings, mapped_document_db, top_k)
top_results = retrieve_top_results(sorted_scores)

document_data = read_json("database/doc_store_2.json")

    # 6. Retrieve Text
relavent_text = retrieve_text(top_results, document_data)

print("Relevant Text:\n", relavent_text)

    # 7. Generate LLM Response (Uncomment if you have API key)
response = generate_llm_response(gemini_model, query, relavent_text)
print("\nLLM Response:\n", response)


Relevant Text:
 {'TEXT': "Make sure your child gets enough sleep. With too little sleep, kids can become hyper, disagreeable, and have extremes in behavior. Getting enough sleep can greatly reduce tantrums. Find out how much sleep is needed at your child’s age. Most kids'\xa0sleep needs\xa0fall within a set range of hours based on their age, but each child is unique.", 'metadata': {'filename': 'behaviuor1'}}

LLM Response:
 Getting enough sleep can greatly reduce tantrums. Kids can become hyper, disagreeable, and have extremes in behavior with too little sleep.

