<a href="https://colab.research.google.com/github/prathameshks/FileChat-Using-RAG/blob/main/fileChat-With-RAG-Simplified.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install PyMuPDF spacy sentence-transformers bitsandbytes accelerate huggingface_hub



In [3]:
print("[INFO] Importing Required Libraries")

import os
import requests
import fitz
from tqdm.auto import tqdm
from spacy.lang.en import English
from sentence_transformers import SentenceTransformer,util
import torch
import pandas as pd
import numpy as np
from huggingface_hub import login
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from transformers.utils import is_flash_attn_2_available
import torch
from google.colab import userdata, files
from IPython.display import display, Markdown
import random

print("[INFO] Loading Dependencies")
nlp = English()
nlp.add_pipe('sentencizer')
print("[INFO] Sentencizer Installed")

login(token=userdata.get('HUGGINGFACE_TOKEN'))
print("[INFO] Huggingface Login Successful")

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# create embeddings using model all-mpnet-base-v2
print("[INFO] Loading Embedding Model")
model = SentenceTransformer('all-mpnet-base-v2',device=device)
print("[INFO] Loaded Embedding Model all-mpnet-base-v2")


# create quantization config
# requires !pip install bitsandbytes accelerate
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

# 2 falsh attention is available use it for faster attention mechanism
if (is_flash_attn_2_available()) and (torch.cuda.get_device_capability(0)[0] >= 8.0):
    attn_implementation = "flash_attention_2"
else:
    attn_implementation = "sdpa"
print(f"[INFO] Using Attention Implementation: {attn_implementation}")

# instantiate tokenizer
tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-2b-it") # This line was causing the error
print("[INFO] Initialized Tokinizer for gemma-2-2b-it")

# instantiate model
print("[INFO] Loading LLM Model gemma-2-2b-it")
llm_model = AutoModelForCausalLM.from_pretrained(
    "google/gemma-2-2b-it",
    torch_dtype=torch.float16,
    low_cpu_mem_usage=True,
    attn_implementation=attn_implementation
)
print("[INFO] Loaded LLM Model gemma-2-2b-it")

llm_model.to(device)
print("[INFO] All Dependencies Loaded Successfully")

chunk_embeddings = None
chunk_list = None

[INFO] Importing Required Libraries
[INFO] Loading Dependencies
[INFO] Sentencizer Installed
The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `huggingface-cli` if you want to set the git credential as well.
Token is valid (permission: write).
Your token has been saved to /root/.cache/huggingface/token
Login successful
[INFO] Huggingface Login Successful
[INFO] Loading Embedding Model


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


[INFO] Loaded Embedding Model all-mpnet-base-v2
[INFO] Using Attention Implementation: sdpa
[INFO] Initialized Tokinizer for gemma-2-2b-it
[INFO] Loading LLM Model gemma-2-2b-it


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

[INFO] Loaded LLM Model gemma-2-2b-it
[INFO] All Dependencies Loaded Successfully


In [4]:
def extract_text_from_pdf(pdf_path:str) -> list[dict]:
    doc = fitz.open(pdf_path)
    page_list = []
    for page_no,page in tqdm(enumerate(doc)):
        text = page.get_text().replace('\n', '').strip()

        page_list.append({
            'page_no': page_no,
            'text': text,
            'char_count': len(text),
            'token_count':len(text)/4
        })
    return page_list

In [5]:
# chunking
def make_chunks(page_list:list[dict], chunk_size:int = 10,overlap_sentences:int = 2,min_chunk_size:int = 100) -> list[dict]:
    # text to sentences
    for page in tqdm(page_list):
        page['sentences'] = []
        for sent in nlp(page['text']).sents:
            page['sentences'].append(sent.text)
        page['sentence_count'] = len(page['sentences'])

    chunk_list = []
    # sentences to seperate chunks of max chunk_size
    for page in tqdm(page_list):
        page['chunks'] = []
        for i in range(0, page['sentence_count'], chunk_size-overlap_sentences):
            page['chunks'].append(page['sentences'][i:i+chunk_size])
        # make a chunk a special dict for further operations based on each chunk rather than a page
        for chunk in page['chunks']:
            chunk_item = {
                'page_no': page['page_no'],
                'text': ' '.join(chunk),
                'sentence_count': len(chunk),
            }
            chunk_item['char_count'] = len(chunk_item['text'])
            if chunk_item['char_count'] < min_chunk_size:
                continue
            chunk_item['token_count'] = len(chunk_item['text'])/4
            chunk_list.append(chunk_item)

    return chunk_list


In [6]:
def create_embeddings(chunk_list:list[dict]) -> list[dict]:
    # leverage GPU
    chunk_texts = [chunk['text'] for chunk in chunk_list]
    chunk_embeddings = model.encode(chunk_texts,batch_size=64,convert_to_tensor=False)
    for i,chunk in tqdm(enumerate(chunk_list)):
        chunk['embedding'] = chunk_embeddings[i]

    return chunk_list

In [7]:
def save_embeddings(chunk_list:list[dict],save_path:str = 'embeddings.csv') -> None:
    df = pd.DataFrame(chunk_list)
    # df['embedding'] = df['embedding'].apply(lambda x: x.tolist())
    df.to_csv(save_path,index=False)


In [8]:
def str_to_array(x:str) -> np.array:
    return np.array(x.strip('[]').replace('\n','').split(),dtype=np.float32)

def load_embeddings(save_path:str = 'embeddings.csv') -> list[dict]:
    df = pd.read_csv(save_path)
    df['embedding'] = df['embedding'].apply(str_to_array)
    # convert to list of dicts
    chunk_list = df.to_dict(orient='records')
    # for chunk in chunk_list:
    #     chunk['embedding'] = torch.tensor(np.array(chunk['embedding'].tolist()),dtype = torch.float32).to(device)
    return chunk_list


In [9]:
# creating a function to get top k result from embeddings
def return_relevant_top_k(query:str,
                          embeddings:torch.tensor,
                          embedding_model:SentenceTransformer = model,
                          k:int = 5,
                          device:torch.device = device
):
    """Return a top k result from embeddings based on dot product similarity with the query
    input:
        query: str Query to be searched
        embeddings: torch.tensor Embeddings in which to search
        embedding_model: SentenceTransformer = embedding_model Embedding model
        k: int = 5
        device: torch.device = device
    output:
        top_relevant_results: list[dict]
    """
    # create embedding of query
    query_embedding = embedding_model.encode(query, convert_to_tensor=True)
    # dot product
    dot_scores = util.dot_score(a=query_embedding,b=embeddings)[0]
    # getting top k index and scores
    top_k_dot_products = torch.topk(dot_scores,k=k)

    return top_k_dot_products

In [10]:
def get_relevent_top_k(top_k_dot_products:torch.tensor,chunk_list:list[dict]):
    ans = []
    for score,index in zip(top_k_dot_products.values,top_k_dot_products.indices):
        relevent_chunk = chunk_list[index]
        """{{'page_no': 285,
        'text': 'Insulin has an opposing hormone called glucagon. Glucagon-secreting cells in the pancreas sense the drop in glucose and, in response, release glucagon into the blood. Glucagon communicates to the cells in the body to stop using all the glucose. More specifically, it signals the liver to break down glycogen and release the stored glucose into the blood, so that glucose levels stay within the target range and all cells get the needed fuel to function properly. Figure 4.8 The Regulation of Glucose 244  |  Digestion and Absorption of Carbohydrates',
         'sentence_count': 5, 'char_count': 549, 'token_count': 137.25, 'embedding'
        """
        ans.append({
            "score":score,
            "text":relevent_chunk["text"],
            "page_no":relevent_chunk["page_no"],
            "sentence_count":relevent_chunk["sentence_count"],
            "char_count":relevent_chunk["char_count"],
            "token_count":relevent_chunk["token_count"]
        })
    return ans

In [11]:
def prompt_formatter(query: str,
                     context_items: list[dict]) -> str:
    # context = "- " + "\n- ".join([f"({item['score']:.2f}) {item['text']}" for item in context_items])

    # base_prompt = f"""
    # Based on the following context with there match score with the query in the starting parenthesis, Give me the answer for the query.
    # Context : {context}

    # Query : {query}

    # Answer :
    # """
    context = ", \n".join([item['text'] for item in context_items])

    base_prompt = """Please read the following context items, and then answer the question below based on the provided context, If Context is not sufficient then answer question on your own,

Context:
{context}

Question:
{query}

Answer:"""
    base_prompt = base_prompt.format(context=context,query=query)

    # create prompt template
    dialouge_template = [
        {
            "role":"user",
            "content":base_prompt
        }
    ]

    # apply template
    prompt = tokenizer.apply_chat_template(conversation=dialouge_template,
                                          tokenize=False,
                                          add_generation_prompt=True)

    # print(f"Prompt: {prompt}")

    return prompt

In [12]:
def ask(
    query:str,
    chunk_embeddings:torch.tensor,
    chunk_list:list[dict],
    embb_model:SentenceTransformer = model,
    llm_model:AutoModelForCausalLM = llm_model,
    tokenizer:AutoTokenizer = tokenizer,
    k:int = 5,
    temperature:float=0.7,
    max_new_tokens:int=512,
    device:torch.device = device if device else torch.device("cpu")
) -> str:
    # get top k results ie retrival
    top_k_results = return_relevant_top_k(query,chunk_embeddings,embb_model,k=k)
    context_items = get_relevent_top_k(top_k_results,chunk_list)

    # make prompt
    prompt = prompt_formatter(query,context_items)
    # print(prompt)

    # tokenize
    input_ids = tokenizer(prompt,return_tensors="pt").to(device)

    # generate output
    outputs = llm_model.generate(**input_ids,temperature=temperature,max_new_tokens=max_new_tokens,do_sample=True)

    # output token to text
    output_text = tokenizer.decode(outputs[0])

    return output_text.replace(prompt,'')

In [13]:
def prity_output(query:str,response:str):
    # remove <bos>... tags
    print(f"Query: {query}")
    print("Response: ")
    response = response.replace("<bos>","").replace("<|endoftext|>","").replace("</s>","")
    display(Markdown(response))



In [14]:
def upload_file()->str:
    uploaded = files.upload()
    return list(uploaded.keys())[0]

In [15]:
# start with pdf
def start_with_pdf():
    global chunk_embeddings,chunk_list
    print("Upload PDF File Below.")
    pdf_name = upload_file()
    print("[INFO] PDF Uploaded Successfully, Processing PDF.")
    page_list = extract_text_from_pdf(pdf_name)
    chunk_list = make_chunks(page_list)

    print("[INFO] Creating embedings from PDF.")
    chunk_list = create_embeddings(chunk_list)
    print("[INFO] Embeddings Created Successfully, Saving Embeddings.")
    save_embeddings(chunk_list)
    # give option to download embeddings
    print()
    choice = input("Do you want to download embeddings? (y/n) :")
    if choice[0].lower() == "y":
        files.download('embeddings.csv')
        print("[INFO] Embeddings Downloaded Successfully, If not then you can find it in sidebar in files option.")
    else:
        print("[INFO] You can find embeddings in files option in sidebar.")

    # make chunk embeddings, chunk list as global
    numpy_embeddings = np.array([chunk['embedding'] for chunk in tqdm(chunk_list)])
    chunk_embeddings = torch.tensor(numpy_embeddings, dtype=torch.float32).to(device)
    # complete
    print("[INFO] PDF Processing Completed Successfully.")

# start with embeddings
def start_with_embeddings():
    global chunk_embeddings,chunk_list
    print("Upload Embeddings File Below(In CSV Format).")
    embedding_file = upload_file()
    print("[INFO] Embeddings Uploaded Successfully, Loading Embeddings.")
    chunk_list = load_embeddings(embedding_file)
    # chunk_embeddings = torch.tensor([chunk['embedding'] for chunk in chunk_list],dtype=torch.float32).to(device)
    # Convert the list of NumPy arrays into a single NumPy array
    numpy_embeddings = np.array([chunk['embedding'] for chunk in tqdm(chunk_list)])
    chunk_embeddings = torch.tensor(numpy_embeddings, dtype=torch.float32).to(device)

    print("[INFO] Embeddings Loaded Successfully")

def start_with_embeddings_without_upload():
    global chunk_embeddings,chunk_list
    print("[INFO] Embeddings Uploaded Successfully, Loading Embeddings.")
    chunk_list = load_embeddings("embeddings.csv")
    # chunk_embeddings = torch.tensor([chunk['embedding'] for chunk in chunk_list],dtype=torch.float32).to(device)
    # Convert the list of NumPy arrays into a single NumPy array
    numpy_embeddings = np.array([chunk['embedding'] for chunk in tqdm(chunk_list)])
    chunk_embeddings = torch.tensor(numpy_embeddings, dtype=torch.float32).to(device)

    print("[INFO] Embeddings Loaded Successfully")

In [16]:
print("Choose option from Below")
print("1. Use PDF")
print("2. Use Existing Embeddings")
print("3. Use Existing Embeddings Without Uploading")
choice = input("Enter your choice: ")
if choice == "1":
    start_with_pdf()
elif choice == "2":
    start_with_embeddings()
elif choice == "3":
    start_with_embeddings_without_upload()
else:
    print("Invalid Choice")

Choose option from Below
1. Use PDF
2. Use Existing Embeddings
3. Use Existing Embeddings Without Uploading
Enter your choice: 3
[INFO] Embeddings Uploaded Successfully, Loading Embeddings.


  0%|          | 0/56 [00:00<?, ?it/s]

[INFO] Embeddings Loaded Successfully


In [None]:
%%time
query = input("Enter Question to PDF:")
prity_output(query,ask(query,chunk_embeddings,chunk_list))

Enter Question to PDF:what is target function and its importance
