In [1]:
!git clone https://github.com/rohitmishra94/pdf-rag.git

Cloning into 'pdf-rag'...
remote: Enumerating objects: 23, done.[K
remote: Counting objects: 100% (23/23), done.[K
remote: Compressing objects: 100% (21/21), done.[K
remote: Total 23 (delta 8), reused 12 (delta 2), pack-reused 0 (from 0)[K
Receiving objects: 100% (23/23), 304.64 KiB | 1.10 MiB/s, done.
Resolving deltas: 100% (8/8), done.


In [66]:
!pip install -r /content/pdf-rag/requirements.txt



In [65]:
# rm -r /content/chromadb_folder

In [1]:
from chromadb.utils import embedding_functions
from FlagEmbedding import FlagReranker
import chromadb
from chromadb import Documents, EmbeddingFunction, Embeddings
from FlagEmbedding import BGEM3FlagModel
import pymupdf4llm
from tqdm import tqdm
import numpy as np
from tenacity import retry, wait_random_exponential, stop_after_attempt
from tqdm import tqdm
from openai import OpenAI, AsyncOpenAI
import os
from typing import List, Dict, Any, Optional
import logging.config
import json
import asyncio
from dotenv import load_dotenv
import tiktoken


print('env variable loaded: ',load_dotenv('/content/env'))

logger = logging.getLogger(__name__)

AsyncClient = AsyncOpenAI()

env variable loaded:  True


In [46]:
await check_pdf_page_for_index('/content/pdf-rag/handbook.pdf')

checking for index..
index saved at  handbook.pdf_index.txt


In [54]:
with open('/content/handbook.pdf_index.txt','r') as file:
  f= file.readline().strip()
f

'/content/pdf-rag/handbook.pdf'

In [18]:
#!/usr/bin/env python
# coding: utf-8

# In[1]:




def embedding_function_bge(text_list):
    return model.encode(text_list, return_dense=True)['dense_vecs']



class MyEmbeddingFunction(EmbeddingFunction):
    def __call__(self, input: Documents) -> Embeddings:
        embeddings = embedding_function_bge(input)
        return embeddings

model = BGEM3FlagModel('BAAI/bge-m3',  use_fp16=True)
default_ef = MyEmbeddingFunction()
client = chromadb.PersistentClient(path="chromadb_folder")


# In[2]:


# embedding_function_bge(['iam there'])
# default_ef(['iam there'])


# In[3]:


def process_texts(texts, chunk_size=100, overlap=30):
    """Process a list of texts, splitting them into chunks of specified size with overlap,
    and accumulating shorter texts."""
    accumulated_words = []  # Accumulate words from texts shorter than chunk_size
    final_chunks = []  # Store the final chunks of text

    for text in texts.split():
        accumulated_words.append(text)

        while len(accumulated_words) >= chunk_size:
            # Take the first chunk_size words for the current chunk
            chunk = " ".join(accumulated_words[:chunk_size])
            final_chunks.append(chunk)
            # Remove words from the start of the accumulated_words, considering overlap
            accumulated_words = accumulated_words[chunk_size - overlap:]

    # If there are any remaining words, form the last chunk
    if accumulated_words:
        final_chunks.append(" ".join(accumulated_words))

    return final_chunks

def get_unique_text_indices(text_list):
    unique_texts = {}
    unique_indices = []

    for i, text in enumerate(text_list):
        if text not in unique_texts:
            unique_texts[text] = i
            unique_indices.append(i)

    return unique_indices

def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:

    try:
        # Get the tokenizer for the specified model
        tokenizer = tiktoken.encoding_for_model(model)
    except KeyError:
        # Default to a generic encoding if the model is unknown
        tokenizer = tiktoken.get_encoding("cl100k_base")

    # Tokenize the text and return the token count
    token_count = len(tokenizer.encode(text))
    return token_count


# In[4]:
import fitz
check_pdf_prompt = ''' Analyze the page content and return True if page as table of content information.
return json output {'toc': true or false}. pdf_page is
'''
async def check_pdf_page_for_index(pdf_path):
    scanned_pages_dict = {}
    doc = fitz.open(pdf_path)
    doc_name = doc.name.split('/')[-1]
    # Iterate through each page
    print('checking for index..')
    for page_num in range(10):
        page = doc.load_page(page_num)  # Load the current page
        text = page.get_text()
        msg = [{"role": "system", "content": check_pdf_prompt + f'{text}'}]
        response = await chat_completion_request(msg)
        output = json.loads(response.choices[0].message.content)
        # print(output)
        scanned_pages_dict[page_num] = output['toc']
    # return scanned_pages_dict

    index_pages = [k for k,v in scanned_pages_dict.items() if v==True]
    index_text = [doc.load_page(i).get_text() for i in index_pages]
    if index_text:
        collection_name = pdf_path.split('/')[-1]
        with open(f'{collection_name}_index.txt','w') as f:
            f.write(pdf_path+'\n')
            f.write('\n'.join(index_text))
            print('index saved at ',f'{collection_name}_index.txt')
        return f'{collection_name}_index.txt'
    else:
      return 'no index found'

# await check_pdf_page_for_index('pdf-rag/handbook.pdf')


async def create_pdf_collection(pdf_path):
    """
    Process a PDF file and add its chunks to a collection.

    Args:
        pdf_path: Path to the PDF file
        collection: The collection object to add documents to
    """
    try:
        md_text = pymupdf4llm.to_markdown(pdf_path,show_progress=True)
        _ = await check_pdf_page_for_index(pdf_path)
        all_chunks = process_texts(md_text, chunk_size=500, overlap=50)
        index_info = await check_pdf_page_for_index(pdf_path)
        collection_name = pdf_path.split('/')[-1]
        collection = client.get_or_create_collection(name=collection_name,embedding_function=default_ef)
        logger.info(collection)

        for idx, chunk in tqdm(enumerate(all_chunks)):
            id_ = str(idx)
            collection.add(
                documents=[chunk],
                ids=[id_]
            )
        status = 'success'
        return status,collection_name,index_info

    except Exception as e:
        logger.error(f"Error creating pdf collection: {str(e)}", exc_info=True)
        return f'Sorry for inconvenience. Error creating pdf collection. Please contact support.'


# In[5]:


# create_pdf_collection('/workspace/test/pdf-rag/handbook.pdf')


# In[6]:


def get_collections():
    return client.list_collections()

def load_collection(collection_name):
    try:
        collection = client.get_or_create_collection(name=collection_name,embedding_function=default_ef)
        status = 'success'
        return status, collection
    except Exception as e:
        logger.error(f"Collection load request failed: {str(e)}", exc_info=True)
        raise


# In[ ]:





# In[7]:


def get_full_context(query, collection, n_results=5, top=2):
    logger.info(f'quering collection---> {collection}')

    result = collection.query(query_texts = query,n_results=n_results)
    texts = result['documents'][0]
    ids = result['ids'][0]
    unique_indices = get_unique_text_indices(texts)
    unique_docs = [texts[x] for x in unique_indices]
    unique_ids = [ids[x] for x in unique_indices]
    ## colbert
    query_col = model.encode([query],return_colbert_vecs=True)
    docs_col = model.encode(unique_docs,return_colbert_vecs=True)
    colber_scores = []
    for vectors in docs_col['colbert_vecs']:
        colber_scores.append(model.colbert_score(query_col['colbert_vecs'][0],vectors).numpy())

    ## full_context_colbert
    full_context_scores = []
    full_context_ids = []
    for id in unique_ids:
        pre_id,post_id = str(int(id)-1), str(int(id)+1)
        # print(pre_id,id,post_id)
        full_context_ids.append([pre_id,id,post_id])
        full_context=collection.get(ids=[f'{pre_id}',f'{id}',f'{post_id}'])['documents']
        full_context = ''.join(full_context)
        full_context_colber_vec = model.encode([full_context],return_colbert_vecs=True)
        full_context_colber_score = model.colbert_score(query_col['colbert_vecs'][0],full_context_colber_vec['colbert_vecs'][0]).numpy()

        full_context_scores.append(full_context_colber_score)

    all_scores = [2*full_context_scores[i]+0.9*colber_scores[i] for i in range(len(colber_scores))]
    sorted_indices = [index for index, _ in sorted(enumerate(all_scores), key=lambda x: x[1], reverse=True)]
    top_context_ids_list = [full_context_ids[index] for index in sorted_indices][:top]
    flattened_list = np.array(top_context_ids_list).flatten().tolist()
    top_ids = list(set(flattened_list))
    top_context = collection.get(ids=top_ids)['documents']

    logger.info(f'context retrieved from collection---> {collection}')
    return top_context, top_ids


# In[8]:


async def generate_response(params: Dict[str, Any]) -> Any:
    """Generate response using OpenAI API with error handling and logging."""
    try:
        logger.info(f"Generating response with model: {params.get('model')}")
        response = await AsyncClient.chat.completions.create(**params)
        logger.info("Response generated successfully")
        return response
    except Exception as e:
        logger.error(f"Error generating response: {str(e)}", exc_info=True)
        return f'Sorry for inconvenience. Please contact support.'

@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
async def chat_completion_request(messages: List[Dict], model='gpt-4o-mini') -> Any:
    """Make a chat completion request with retry logic."""
    try:
        params = {
            'messages': messages,
            'max_tokens': 1000,
            'model': model,
            'temperature': 0,
            'response_format': {"type": "json_object"}
        }

        response = await generate_response(params)
        return response
    except Exception as e:
        logger.error(f"Chat completion request failed: {str(e)}", exc_info=True)
        return f'Sorry for inconvenience. Please contact support.'



# In[9]:

async def get_toc_content(query,index_name):


  with open(f'{index_name}','r') as f:
    pdf_path = f.readline().strip()
    content = f.read()
    prompt = f'''
  Your task is to return list of page no which may contain information regarding {query}
  based on this table of content {content} output format: json page:[page no]
  '''
    msg = [{"role": "system", "content": prompt}]
    response = await chat_completion_request(msg)
    output = json.loads(response.choices[0].message.content)

    doc = fitz.open(pdf_path)
    print('looking for context at pages: ', output['page'])
    context = [doc.load_page(page_num-1).get_text() for page_num in output['page']]

    user_query = f'Based on below CONTEXT {context} ANSWER the query {query}'

    system_instruction = '''Ideal Output Format
The output should be a structured JSON blob that question with its corresponding answer.
Answers should be word to word match if the question is a word to word match
If the CONTEXT is insufficient, reply with “Data Not Available'''

    msg = [{"role": "system", "content": system_instruction},{"role": "user", "content": user_query}]

    response = await chat_completion_request(msg)
    total_tokens = response.usage.total_tokens
    output = json.loads(response.choices[0].message.content)

    return output,total_tokens



def check_file_exists(file_path):
    return os.path.isfile(file_path)

async def get_answer(query,collection_name):
    # context = get_context(query,n_results=5,top_results=2,threshold = 0.5)

    collection = client.get_or_create_collection(name=collection_name,embedding_function=default_ef)

    context, context_idx = get_full_context(query, collection, n_results=5, top=2)
    user_query = f'Based on below CONTEXT {context} ANSWER the query {query}'

    system_instruction = '''Ideal Output Format
The output should be a structured JSON blob that question with its corresponding answer.
Answers should be word to word match if the question is a word to word match
If the CONTEXT is insufficient, reply with “Data Not Available'''

    msg = [{"role": "system", "content": system_instruction},{"role": "user", "content": user_query}]

    response = await chat_completion_request(msg)
    total_tokens = response.usage.total_tokens
    output = json.loads(response.choices[0].message.content)

    return output,total_tokens


# In[22]:


@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
async def chat_request(messages: List[Dict], tools=None, model='gpt-4o-mini', stream=False) -> Any:
    """Make a chat completion request with retry logic."""
    try:
        params = {
            'messages': messages,
            'max_tokens': 1500,
            'model': model,
            'temperature': 0,
            'tools': tools,
            'tool_choice': "auto",
            'stream': stream,
        }
        response = await generate_response(params)
        return response
    except Exception as e:
        logging.error(f"Chat completion request failed: {str(e)}", exc_info=True)
        raise

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_answer",
            "description": "Use this function to get answers based on context documents from database to user questions. The documents are purely based on user db.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "User query in string.",
                    },
                    "collection": {
                        "type": "string",
                        "description": "db collection name in string.",
                    }
                },
                "required": ["query","collection"],
            },
        },
    },

    {
        "type": "function",
        "function": {
            "name": "get_toc_content",
            "description": "Use this function to get answers based on index path of document.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "User query in string.",
                    },
                    "index_name": {
                        "type": "string",
                        "description": "index path in string.",
                    }
                },
                "required": ["query","index_name"],
            },
        },
    },

    {
        "type": "function",
        "function": {
            "name": "create_pdf_collection",
            "description": "Use this function to create database collection from pdf file path",
            "parameters": {
                "type": "object",
                "properties": {
                    "pdf_path": {
                        "type": "string",
                        "description": "pdf_path in string.",
                    }
                },
                "required": ["pdf_path"],
            },
        },
    },

    {
        "type": "function",
        "function": {
            "name": "get_collections",
            "description": "Use this function to check exsiting collection in database",

        },
    },
]

system_prompt = '''
Your task is to answer user questions. The user can ask a single question or multiple questions.
You have access to the below tools to return answers based on user single/multiple questions.

# Tool
"name": "get_collections",
"description": "Use this function to check exsiting collection in database",
"name": "create_pdf_collection",
"description": "Use this function to create database collection from pdf file path",
"name": "get_answer",
"description": "Use this function to get answers based on context documents from database to user questions. The documents are purely based on user db.",
"name": "get_toc_content",
"description": "Use this function to get answers based on index path of document.",

# Flow of User interaction
greet the user in first interaction.
and introduce yourself --> you can only provide answers to question based on collection exists in database and list the collections name
using get_collection tool

if user collection does not exist ask user to provide pdf path and create collection based on pdf path using tool create_pdf_collection
pdf path name will be collection name to use. collection creation takes few seconds to ask user to wait few seconds.

if index of document avaialble then first try get_toc_content tool to get answer for user query using index path
if answer not found satisfactory use get_answer tool to get answer for user query using collection name

if user provide collection name , that collection name will be used to answer user query using get_answer tool.
you can call get_answer tool multiple time for multiple questions

if user chat_history available then collection name based on latest chats will be used to answer query

YOUR ONLY TASK TO ANSWER USER QUERY BASED ON COLLECTION/indexpath AVAILABLE ON DATABASE.
'''









# out = await chat_bot(chat_history)



Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

In [21]:

# async def chat_bot(chat_history,recursion_step=0):


#     chat_answer = await chat_request(chat_history, tools=tools, stream=False)
#     print('recursion step ',recursion_step)

#     if hasattr(chat_answer, 'choices') and chat_answer.choices:
#         message = chat_answer.choices[0].message
#         if hasattr(message, 'content') and message.content:
#             print(f"\nResponse: {message.content}")
#             assistant_msg = [{"role": "assistant", "content": message.content}]
#             chat_history += assistant_msg

#         if hasattr(message, 'tool_calls') and message.tool_calls:
#             output_json = {}
#             for call in message.tool_calls:

#                 if call.function.name == 'get_answer':  # Corrected dot notation

#                     print('searching answers...')
#                     arguments = json.loads(call.function.arguments)
#                     query = arguments['query']
#                     collection_name = arguments['collection']
#                     ans,_ = await get_answer(query,collection_name)
#                     output_json[query] = ans['answer']
#                     assistant_msg = [{"role": "assistant", "content": f"answer from database for {query}: {ans['answer']}"}]
#                     chat_history += assistant_msg

#                 if call.function.name == 'get_collections':

#                     print('fetching collection list...')
#                     collection_list = get_collections()
#                     if collection_list:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list {collection_list}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list is empty'}]
#                     chat_history += assistant_msg
#                     chat_answer = await chat_bot(chat_history,recursion_step+1)

#                 if call.function.name == 'create_pdf_collection':

#                     print('creating collection...')
#                     arguments = json.loads(call.function.arguments)
#                     path = arguments['pdf_path']
#                     status,collection_name = create_pdf_collection(path)
#                     if status=='success':
#                         assistant_msg = [{"role": "assistant", "content": f'database collection created with collection name {collection_name}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection creation failed contact support'}]
#                     chat_history += assistant_msg
#                     chat_answer = await chat_bot(chat_history,recursion_step+1)


#             if output_json:
#                 print("\nGenerated JSON Output:")
#                 print(json.dumps(output_json, indent=2))
#                 # assistant_msg = [{"role": "assistant", "content": f' response from database {json.dumps(output_json, indent=2)}'}]
#                 # chat_history += assistant_msg
#                 chat_answer = await chat_bot(chat_history,recursion_step+1)
#             # assistant_msg

#     return chat_history

In [24]:
# async def chat_bot(chat_history, recursion_step=0):
#     chat_answer = await chat_request(chat_history, tools=tools, stream=False)
#     print('recursion step ', recursion_step)

#     if hasattr(chat_answer, 'choices') and chat_answer.choices:
#         message = chat_answer.choices[0].message
#         if hasattr(message, 'content') and message.content:
#             print(f"\nResponse: {message.content}")
#             assistant_msg = [{"role": "assistant", "content": message.content}]
#             chat_history += assistant_msg

#         if hasattr(message, 'tool_calls') and message.tool_calls:
#             output_json = {}
#             needs_recursive_call = False

#             for call in message.tool_calls:
#                 if call.function.name == 'get_answer':
#                     print('searching answers...')
#                     arguments = json.loads(call.function.arguments)
#                     query = arguments['query']
#                     collection_name = arguments['collection']
#                     ans, _ = await get_answer(query, collection_name)
#                     output_json[query] = ans['answer']
#                     assistant_msg = [{"role": "assistant", "content": f"answer from database for {query}: {ans['answer']}"}]
#                     chat_history += assistant_msg

#                 elif call.function.name == 'get_collections':
#                     print('fetching collection list...')
#                     collection_list = get_collections()
#                     if collection_list:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list {collection_list}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list is empty'}]
#                     chat_history += assistant_msg
#                     needs_recursive_call = True

#                 elif call.function.name == 'create_pdf_collection':
#                     print('creating collection...')
#                     arguments = json.loads(call.function.arguments)
#                     path = arguments['pdf_path']
#                     status, collection_name = create_pdf_collection(path)
#                     if status == 'success':
#                         assistant_msg = [{"role": "assistant", "content": f'database collection created with collection name {collection_name}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection creation failed contact support'}]
#                     chat_history += assistant_msg
#                     needs_recursive_call = True

#             if output_json:
#                 print("\nGenerated JSON Output:")
#                 print(json.dumps(output_json, indent=2))

#             if needs_recursive_call:
#                 chat_answer = await chat_bot(chat_history, recursion_step+1)

#     return chat_history

In [27]:
# async def chat_bot(chat_history, recursion_step=0):
#     chat_answer = await chat_request(chat_history, tools=tools, stream=False)
#     print('recursion step ', recursion_step)

#     if hasattr(chat_answer, 'choices') and chat_answer.choices:
#         message = chat_answer.choices[0].message
#         if hasattr(message, 'content') and message.content:
#             print(f"\nResponse: {message.content}")
#             assistant_msg = [{"role": "assistant", "content": message.content}]
#             chat_history += assistant_msg

#         if hasattr(message, 'tool_calls') and message.tool_calls:
#             output_json = {}
#             tool_responses_added = False

#             for call in message.tool_calls:
#                 if call.function.name == 'get_answer':
#                     print('searching answers...')
#                     arguments = json.loads(call.function.arguments)
#                     query = arguments['query']
#                     collection_name = arguments['collection']
#                     ans, _ = await get_answer(query, collection_name)
#                     output_json[query] = ans['answer']
#                     assistant_msg = [{"role": "assistant", "content": f"answer from database for {query}: {ans['answer']}"}]
#                     chat_history += assistant_msg
#                     tool_responses_added = True

#                 elif call.function.name == 'get_collections':
#                     print('fetching collection list...')
#                     collection_list = get_collections()
#                     if collection_list:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list {collection_list}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection list is empty'}]
#                     chat_history += assistant_msg
#                     tool_responses_added = True

#                 elif call.function.name == 'create_pdf_collection':
#                     print('creating collection...')
#                     arguments = json.loads(call.function.arguments)
#                     path = arguments['pdf_path']
#                     status, collection_name = create_pdf_collection(path)
#                     if status == 'success':
#                         assistant_msg = [{"role": "assistant", "content": f'database collection created with collection name {collection_name}'}]
#                     else:
#                         assistant_msg = [{"role": "assistant", "content": f'database collection creation failed contact support'}]
#                     chat_history += assistant_msg
#                     tool_responses_added = True

#             if output_json:
#                 print("\nGenerated JSON Output:")
#                 print(json.dumps(output_json, indent=2))

#             # Make a single recursive call after tool responses are added
#             if tool_responses_added and recursion_step < 2:  # Limit recursion depth
#                 chat_answer = await chat_bot(chat_history, recursion_step + 1)

#     return chat_history

In [19]:
async def chat_bot_v2(chat_history, recursion_step=0, previous_actions=None):
    print(f'recursion step {recursion_step}, previous actions: {previous_actions}')

    previous_actions = previous_actions or set()
    chat_answer = await chat_request(chat_history, tools=tools, stream=False)

    if hasattr(chat_answer, 'choices') and chat_answer.choices:
        message = chat_answer.choices[0].message
        if hasattr(message, 'content') and message.content:
            print(f"\nResponse: {message.content}")
            assistant_msg = [{"role": "assistant", "content": message.content}]
            chat_history += assistant_msg

        if hasattr(message, 'tool_calls') and message.tool_calls:
            output_json = {}
            current_actions = set()
            tool_calls_processed = False

            has_get_answer = any(call.function.name == 'get_answer' for call in message.tool_calls)
            has_get_toc_content = any(call.function.name == 'get_toc_content' for call in message.tool_calls)

            for call in message.tool_calls:
                if call.function.name == 'get_answer' and (recursion_step == 0 or 'get_answer' not in previous_actions):
                    print('searching answers...')
                    arguments = json.loads(call.function.arguments)
                    query = arguments['query']
                    collection_name = arguments['collection']
                    ans, _ = await get_answer(query, collection_name)
                    output_json[query] = ans['answer']
                    assistant_msg = [{"role": "assistant", "content": f"answer from database for {query}: {ans['answer']}"}]
                    chat_history += assistant_msg
                    current_actions.add('get_answer')
                    tool_calls_processed = True

                elif call.function.name == 'get_toc_content' and (recursion_step == 0 or 'get_toc_content' not in previous_actions):
                    print('searching toc answers...')
                    arguments = json.loads(call.function.arguments)
                    query = arguments['query']
                    index_name = arguments['index_name']
                    ans, _ = await get_toc_content(query, index_name)
                    output_json[query] = ans['answer']
                    assistant_msg = [{"role": "assistant", "content": f"answer from database for {query}: {ans['answer']}"}]
                    chat_history += assistant_msg
                    current_actions.add('get_toc_content')
                    tool_calls_processed = True

                elif call.function.name == 'get_collections' and 'get_collections' not in previous_actions:
                    print('fetching collection list...')
                    collection_list = get_collections()
                    if collection_list:
                        assistant_msg = [{"role": "assistant", "content": f'database collection list {collection_list}'}]
                    else:
                        assistant_msg = [{"role": "assistant", "content": f'database collection list is empty'}]
                    chat_history += assistant_msg
                    current_actions.add('get_collections')
                    tool_calls_processed = True

                elif call.function.name == 'create_pdf_collection' and 'create_pdf_collection' not in previous_actions:
                    print('creating collection...')
                    arguments = json.loads(call.function.arguments)
                    path = arguments['pdf_path']
                    status, collection_name, index_info = await create_pdf_collection(path)
                    if status == 'success':
                        assistant_msg = [{"role": "assistant", "content": f'database collection created with collection name {collection_name} and index information at path: {index_info}'}]
                    else:
                        assistant_msg = [{"role": "assistant", "content": f'database collection creation failed contact support'}]
                    chat_history += assistant_msg
                    current_actions.add('create_pdf_collection')
                    tool_calls_processed = True

            if output_json:
                print("\nGenerated JSON Output:")
                print(json.dumps(output_json, indent=2))

            # Make recursive call only if:
            # 1. We processed some tool calls
            # 2. We're in step 0 and need assistant's final response
            if tool_calls_processed and recursion_step == 0 and (has_get_answer or has_get_toc_content):
                print(f'Making recursive call with previous_actions: {current_actions}')
                chat_answer = await chat_bot_v2(chat_history, recursion_step + 1, current_actions)

    return chat_history

In [10]:
rm -rf /content/chromadb_folder

In [20]:
# chat_history = []
# system_msg = [{"role": "system", "content": system_prompt}]
# chat_history = system_msg

while True:
    user_input = input('your input: ',).strip()
    if user_input.lower() == 'q':
        print('Response: Goodbye!!')
        break

    else:
        user_msg = [{"role": "user", "content": user_input}]
        chat_history = chat_history+user_msg
        tokens = count_tokens(json.dumps(chat_history))
        if tokens > 100000:
            print('Response: Context lenght exceeds. Quiting the chat. Please start fresh!!')
            break
        # chat_history = await chat_bot(chat_history)
        chat_history = await chat_bot_v2(chat_history)


your input: tell me about pay rise
recursion step 0, previous actions: None
searching answers...


You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.



Generated JSON Output:
{
  "pay rise": "Depending on financial health and other Company factors, efforts will be made to give pay raises consistent with Zania, Inc. profitability, job performance, and the consumer price index. The Company may also make individual pay raises based on merit or due to a change of job position."
}
Making recursive call with previous_actions: {'get_answer'}
recursion step 1, previous actions: {'get_answer'}

Response: Here are the details about pay rises:

Depending on the financial health and other factors of the company, Zania, Inc. aims to provide pay raises that are consistent with the company's profitability, job performance, and the consumer price index. Additionally, individual pay raises may be granted based on merit or due to a change in job position.

If you have any more questions or need further information, feel free to ask!
your input: provide answer based on index
recursion step 0, previous actions: None
searching toc answers...
looking for 

In [22]:
chat_history

[{'role': 'system',
  'content': '\nYour task is to answer user questions. The user can ask a single question or multiple questions.\nYou have access to the below tools to return answers based on user single/multiple questions.\n\n# Tool\n"name": "get_collections",\n"description": "Use this function to check exsiting collection in database",\n"name": "create_pdf_collection",\n"description": "Use this function to create database collection from pdf file path",\n"name": "get_answer",\n"description": "Use this function to get answers based on context documents from database to user questions. The documents are purely based on user db.",\n                     \n# Flow of User interaction\ngreet the user in first interaction.\nand introduce yourself --> you can only provide answers to question based on collection exists in database and list the collections name\nusing get_collection tool\n\nif user collection does not exist ask user to provide pdf path and create collection based on pdf pat

In [15]:
test_history = [{'role': 'system',
  'content': '\nYour task is to answer user questions. The user can ask a single question or multiple questions.\nYou have access to the below tools to return answers based on user single/multiple questions.\n\n# Tool\n"name": "get_collections",\n"description": "Use this function to check exsiting collection in database",\n"name": "create_pdf_collection",\n"description": "Use this function to create database collection from pdf file path",\n"name": "get_answer",\n"description": "Use this function to get answers based on context documents from database to user questions. The documents are purely based on user db.",\n                     \n# Flow of User interaction\ngreet the user in first interaction.\nand introduce yourself --> you can only provide answers to question based on collection exists in database and list the collections name\nusing get_collection tool\n\nif user collection does not exist ask user to provide pdf path and create collection based on pdf path using tool create_pdf_collection\npdf path name will be collection name to use. collection creation takes few seconds to ask user to wait few seconds.\n\nif user provide collection name , that collection name will be used to answer user query using get_answer tool.\nyou can call get_answer tool multiple time for multiple questions\n\nif user chat_history available then collection name based on latest chats will be used to answer query\n\nYOUR ONLY TASK TO ANSWER USER QUERY BASED ON COLLECTION AVAILABLE ON DATABASE.\n'},
 {'role': 'user', 'content': 'hi'},
 {'role': 'assistant',
  'content': "Hello! I'm here to help you with your questions. I can provide answers based on specific collections in our database. Let me check the existing collections for you. Please hold on for a moment."},
 {'role': 'assistant', 'content': "database collection list ['handbook.pdf']"},
 {'role': 'assistant',
  'content': 'I found an existing collection named "handbook.pdf" in the database. You can ask me questions related to this collection. What would you like to know?'},
 {'role': 'user', 'content': f'answer all these what is company name, who is ceo of company'}]

In [16]:
res = await chat_request(test_history, tools=tools, stream=False)

In [17]:
res.choices[0]

Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, refusal=None, role='assistant', audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_aWXlw9rqzFpTL0KCEvGAsTiY', function=Function(arguments='{"query": "What is the company name?", "collection": "handbook.pdf"}', name='get_answer'), type='function'), ChatCompletionMessageToolCall(id='call_oVjeFU18dZIy2LvYhpt4DwgZ', function=Function(arguments='{"query": "Who is the CEO of the company?", "collection": "handbook.pdf"}', name='get_answer'), type='function')]))

In [38]:
# import fitz
# check_pdf_prompt = ''' Analyze the page content and return True if page as table of content information.
# return json output {'toc': true or false}. pdf_page is
'''
# async def chat_completion_request(messages: List[Dict], model='gpt-4o-mini') -> Any:
#     """Make a chat completion request with retry logic."""
#     try:
#         params = {
#             'messages': messages,
#             'max_tokens': 500,
#             'model': model,
#             'temperature': 0,
#             'response_format': {"type": "json_object"}
#         }

#         response = await generate_response(params)
#         return response
#     except Exception as e:
#         logger.error(f"Chat completion request failed: {str(e)}", exc_info=True)
#         raise







SyntaxError: incomplete input (<ipython-input-38-aa6bfea7a4d6>, line 4)

In [36]:
# async def get_toc_content(query,index_name):


#   with open(f'{index_name}','r') as f:
#     content = f.read()
#     prompt = f'''
#   Your task is to return list of page no which may contain information regarding {query}
#   based on this table of content {content} output format: json page:[page no]
#   '''
#     msg = [{"role": "system", "content": prompt}]
#     response = await chat_completion_request(msg)
#     output = json.loads(response.choices[0].message.content)
#     pdf_path = index_name.split('_index')[0]
#     doc = fitz.open(pdf_path)
#     print('looking for context at pages: ', output['page'])
#     context = [doc.load_page(page_num-1).get_text() for page_num in output['page']]
#     return context

    #
    # doc_name = doc.name.split('/')[-1]
    # Iterate through each page


out = await get_toc_content('what is sick pay','pdf-rag/handbook.pdf_index.txt')


looking for context at pages:  [24, 40, 41]


"retains the discretion to determine the similarity of any available positions and your qualifications. If we are unable to\nreinstate you or you refuse the offer of reinstatement to a different position, your leave status will be changed to a voluntary\ntermination.\nFailure to Return from Leave\nIf you fail to return to work after an unpaid leave of absence, you will be considered to have resigned your employment.\nAlternative Employment\nWhile on an unpaid leave of absence, you may not work or be gainfully employed either for yourself or others unless\nexpress, written permission to perform such outside work has been granted by the Company. If you are on a leave of\nabsence and are found to be working elsewhere without permission, you will be subject to disciplinary action up to and\nincluding termination.\n7.6 Sick Pay\nZania, Inc. allows its regular full-time employees who have completed their introductory period [[#]] sick days per calendar\nyear. Notify your Manager as far in ad

In [31]:
count_tokens(' '.join(out))

1721

In [6]:

pip install pymupdf



In [7]:
import fitz

In [43]:
def extract_text_by_page(pdf_path):
    # Open the provided PDF file
    page_content_dict = {}
    doc = fitz.open(pdf_path)
    doc_name = doc.name.split('/')[-1]
    # Iterate through each page
    for page_num in range(len(doc)):
        page = doc.load_page(page_num)  # Load the current page
        text = page.get_text()  # Extract text from the page
        page_number = page_num+1

        if page_content_dict.get(page_number,None):
            page_content_dict[page_number]+='\n'.join([text])
        else:
            page_content_dict[page_number] = text

        # print(f"--- Page {page_num + 1} ---\n{text}")

    doc.close()

    return page_content_dict, doc_name

In [46]:
content_json,_ = extract_text_by_page('pdf-rag/handbook.pdf')

In [48]:
content_json[2]

'TABLE OF CONTENTS\nCORE POLICIES\n4\n1.0 WELCOME\n4\n1.1 A Welcome Policy\n4\n1.2 At-Will Employment\n4\n2.0 INTRODUCTORY LANGUAGE AND POLICIES\n5\n2.1 About the Company\n5\n2.2 Company Facilities\n5\n2.3 Ethics Code\n5\n2.4 Mission Statement\n5\n2.5 Our Organization\n5\n2.6 Revisions to Handbook\n5\n3.0 HIRING AND ORIENTATION POLICIES\n5\n3.1 Accommodations for Pregnant Employees\n5\n3.2 Conflicts of Interest\n6\n3.3 Employment Authorization Verification\n6\n3.4 Employment of Relatives and Friends\n6\n3.5 Job Descriptions\n7\n3.6 New Hires and Introductory Periods\n7\n3.7 Training Program\n7\n4.0 WAGE AND HOUR POLICIES\n7\n4.1 Attendance\n7\n4.2 Business Expenses\n7\n4.3 Direct Deposit\n8\n4.4 Employment Classifications\n8\n4.5 Introduction to Wage and Hour Policies\n8\n4.6 Job Abandonment\n9\n4.7 Paycheck Deductions\n9\n4.8 Recording Time\n9\n4.9 Travel Expenses\n10\n4.10 Use of Employer Credit Cards\n11\n5.0 PERFORMANCE, DISCIPLINE, LAYOFF, AND TERMINATION\n11\n5.1 Criminal Activit

In [2]:
!wget https://aimistanforddatasets01.blob.core.windows.net/chexpertchestxrays-u20210408?sv=2019-02-02&sr=c&sig=4E5H6PmEWeGe1ewPb8U%2FUbIYo%2Bc9zc%2BfU4jwWlM6AaU%3D&st=2025-01-10T13%3A55%3A06Z&se=2025-02-09T14%3A00%3A06Z&sp=rl

--2025-01-10 14:01:36--  https://aimistanforddatasets01.blob.core.windows.net/chexpertchestxrays-u20210408?sv=2019-02-02
Resolving aimistanforddatasets01.blob.core.windows.net (aimistanforddatasets01.blob.core.windows.net)... 20.60.229.1
Connecting to aimistanforddatasets01.blob.core.windows.net (aimistanforddatasets01.blob.core.windows.net)|20.60.229.1|:443... connected.
HTTP request sent, awaiting response... 404 The specified resource does not exist.
2025-01-10 14:01:36 ERROR 404: The specified resource does not exist..

